Skip to content

Commit

Permalink
Merge pull request #1233 from basho/feature/optimize-list-objects
Browse files Browse the repository at this point in the history
Merge CV's list-objects prefix optimization + Add key length limit [JIRA: RCS-275]

Reviewed-by: shino

Conflicts:
	src/riak_cs_s3_response.erl
	src/riak_cs_wm_object.erl
	src/riak_cs_wm_object_upload_part.erl
  • Loading branch information
borshop authored and kuenishi committed Sep 18, 2015
1 parent 4c215f6 commit f7ada0f
Show file tree
Hide file tree
Showing 15 changed files with 155 additions and 48 deletions.
2 changes: 2 additions & 0 deletions include/riak_cs.hrl
Original file line number Diff line number Diff line change
Expand Up @@ -504,3 +504,5 @@

-define(OBJECT_BUCKET_PREFIX, <<"0o:">>). % Version # = 0
-define(BLOCK_BUCKET_PREFIX, <<"0b:">>). % Version # = 0

-define(MAX_S3_KEY_LENGTH, 1024).
6 changes: 6 additions & 0 deletions rel/files/riak_cs.schema
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,12 @@
{datatype, [integer, {atom, unlimited}]}
]}.

%% @doc Max length of a key. Default is 1024.
{mapping, "max_key_length", "riak_cs.max_key_length", [
{default, 1024},
{datatype, [integer, {atom, unlimited}]}
]}.

%% @doc Switch whether Riak CS trusts 'X-Forwarded-For' header.
%% If your load balancer adds 'X-Forwarded-For' header
%% and it is reliable (able to gaurantee it is not added
Expand Down
7 changes: 7 additions & 0 deletions riak_test/src/list_objects_test_helper.erl
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,13 @@ test(UserConfig) ->
?assert(lists:member([{prefix, "0/"}], CommonPrefixes3)),
verify_object_list(ObjList4, 30),

%% Don't fail even if Prefix is longer than 1024, even if keys are
%% restricted to be shorter than it. That's S3.
TooLongKey = "0/" ++ binary_to_list(binary:copy(<<"b">>, 1025)) ++ "/",
OptionsX = [{prefix, TooLongKey}],
ObjListX = erlcloud_s3:list_objects(?TEST_BUCKET, OptionsX, UserConfig),
?assertEqual([], proplists:get_value(contents, ObjListX)),

delete_objects(?TEST_BUCKET, Count3, [], UserConfig),
delete_objects(?TEST_BUCKET, 4, Prefix1, UserConfig),
delete_objects(?TEST_BUCKET, 4, Prefix2, UserConfig),
Expand Down
13 changes: 13 additions & 0 deletions riak_test/tests/object_get_test.erl
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ confirm() ->
non_mp_get_cases(UserConfig),
mp_get_cases(UserConfig),
timestamp_skew_cases(UserConfig),
long_key_cases(UserConfig),
rtcs:pass().

non_mp_get_cases(UserConfig) ->
Expand Down Expand Up @@ -164,6 +165,18 @@ timestamp_skew_cases(UserConfig) ->
meck:unload(httpd_util)
end.

long_key_cases(UserConfig) ->
LongKey = binary_to_list(binary:copy(<<"a">>, 1024)),
TooLongKey = binary_to_list(binary:copy(<<"b">>, 1025)),
Data = <<"pocketburger">>,
?assertEqual([{version_id,"null"}],
erlcloud_s3:put_object(?TEST_BUCKET, LongKey, Data, UserConfig)),
ErrorString = "<?xml version=\"1.0\" encoding=\"UTF-8\"?><Error>"
"<Code>KeyTooLongError</Code><Message>Your key is too long</Message><Size>1025</Size>"
"<MaxSizeAllowed>1024</MaxSizeAllowed><RequestId></RequestId></Error>",
?assertError({aws_error, {http_error, 400, [], ErrorString}},
erlcloud_s3:put_object(?TEST_BUCKET, TooLongKey, Data, UserConfig)).

mb(MegaBytes) ->
MegaBytes * 1024 * 1024.

Expand Down
5 changes: 5 additions & 0 deletions src/riak_cs_config.erl
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@
auto_reconnect/0,
is_multibag_enabled/0,
max_buckets_per_user/0,
max_key_length/0,
read_before_last_manifest_write/0,
region/0,
stanchion/0,
Expand Down Expand Up @@ -393,6 +394,10 @@ is_multibag_enabled() ->
max_buckets_per_user() ->
get_env(riak_cs, max_buckets_per_user, ?DEFAULT_MAX_BUCKETS_PER_USER).

-spec max_key_length() -> pos_integer() | unlimited.
max_key_length() ->
get_env(riak_cs, max_key_length, ?MAX_S3_KEY_LENGTH).

%% @doc Return `stanchion' configuration data.
-spec stanchion() -> {string(), pos_integer(), boolean()}.
stanchion() ->
Expand Down
5 changes: 3 additions & 2 deletions src/riak_cs_list_objects_fsm_v2.erl
Original file line number Diff line number Diff line change
Expand Up @@ -344,11 +344,12 @@ response_from_manifests_and_common_prefixes(Request,

-spec make_2i_request(riak_client(), state()) ->
{state(), {ok, reference()} | {error, term()}}.
make_2i_request(RcPid, State=#state{req=?LOREQ{name=BucketName},
make_2i_request(RcPid, State=#state{req=?LOREQ{name=BucketName,prefix=Prefix},
fold_objects_batch_size=BatchSize}) ->
ManifestBucket = riak_cs_utils:to_bucket_name(objects, BucketName),
StartKey = make_start_key(State),
EndKey = riak_cs_utils:big_end_key(128),
EndKey = riak_cs_utils:big_end_key(Prefix),

NewStateData = State#state{last_request_start_key=StartKey,
last_request_num_keys_requested=BatchSize},
NewStateData2 = update_profiling_state_with_start(NewStateData,
Expand Down
53 changes: 44 additions & 9 deletions src/riak_cs_s3_response.erl
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,9 @@

-type error_reason() :: atom()
| {'riak_connect_failed', term()}
| {'malformed_policy_version', string()}.
| {'malformed_policy_version', string()}
| {'invalid_argument', string()}
| {'key_too_long', pos_integer()}.

-spec error_message(error_reason()) -> string().
error_message(invalid_access_key_id) ->
Expand All @@ -61,6 +63,8 @@ error_message(bucket_already_exists) ->
"The requested bucket name is not available. The bucket namespace is shared by all users of the system. Please select a different name and try again.";
error_message(toomanybuckets) ->
"You have attempted to create more buckets than allowed";
error_message({key_too_long, _}) ->
"Your key is too long";
error_message(user_already_exists) ->
"The specified email address has already been registered. Email addresses must be unique among all users of the system. Please try again with a different email address.";
error_message(entity_too_large) ->
Expand Down Expand Up @@ -115,6 +119,7 @@ error_code(reqtime_tooskewed) -> "RequestTimeTooSkewed";
error_code(bucket_not_empty) -> "BucketNotEmpty";
error_code(bucket_already_exists) -> "BucketAlreadyExists";
error_code(toomanybuckets) -> "TooManyBuckets";
error_code({key_too_long, _}) -> "KeyTooLongError";
error_code(user_already_exists) -> "UserAlreadyExists";
error_code(entity_too_large) -> "EntityTooLarge";
error_code(entity_too_small) -> "EntityTooSmall";
Expand Down Expand Up @@ -158,14 +163,16 @@ error_code(ErrorName) ->
%% http://docs.aws.amazon.com/AmazonS3/latest/API/ErrorResponses.html

-spec status_code(error_reason()) -> pos_integer().
status_code(invalid_access_key_id) -> 403;
status_code(invalid_email_address) -> 400;
status_code(access_denied) -> 403;
status_code(reqtime_tooskewed) -> 403;
status_code(bucket_not_empty) -> 409;
status_code(bucket_already_exists) -> 409;
status_code(user_already_exists) -> 409;
status_code(toomanybuckets) -> 400;
status_code(invalid_access_key_id) -> 403;
status_code(invalid_email_address) -> 400;
status_code(access_denied) -> 403;
status_code(copy_source_access_denied) -> 403;
status_code(reqtime_tooskewed) -> 403;
status_code(bucket_not_empty) -> 409;
status_code(bucket_already_exists) -> 409;
status_code(user_already_exists) -> 409;
status_code(toomanybuckets) -> 400;
status_code({key_too_long, _}) -> 400;
%% yes, 400, really, not 413
status_code(entity_too_large) -> 400;
status_code(entity_too_small) -> 400;
Expand Down Expand Up @@ -240,6 +247,10 @@ api_error({Tag, _}=Error, RD, Ctx)
Ctx);
api_error({toomanybuckets, Current, BucketLimit}, RD, Ctx) ->
toomanybuckets_response(Current, BucketLimit, RD, Ctx);
api_error({invalid_argument, Name, Value}, RD, Ctx) ->
invalid_argument_response(Name, Value, RD, Ctx);
api_error({key_too_long, Len}, RD, Ctx) ->
key_too_long(Len, RD, Ctx);
api_error({error, Reason}, RD, Ctx) ->
api_error(Reason, RD, Ctx).

Expand Down Expand Up @@ -267,6 +278,30 @@ toomanybuckets_response(Current,BucketLimit,RD,Ctx) ->
Body = riak_cs_xml:to_xml([XmlDoc]),
respond(status_code(toomanybuckets), Body, RD, Ctx).

invalid_argument_response(Name, Value, RD, Ctx) ->
XmlDoc = {'Error',
[
{'Code', [error_code(invalid_argument)]},
{'Message', [error_message({invalid_argument, Name})]},
{'ArgumentName', [Name]},
{'ArgumentValue', [Value]},
{'RequestId', [""]}
]},
Body = riak_cs_xml:to_xml([XmlDoc]),
respond(status_code(invalid_argument), Body, RD, Ctx).

key_too_long(Len, RD, Ctx) ->
XmlDoc = {'Error',
[
{'Code', [error_code({key_too_long, Len})]},
{'Message', [error_message({key_too_long, Len})]},
{'Size', [Len]},
{'MaxSizeAllowed', [riak_cs_config:max_key_length()]},
{'RequestId', [""]}
]},
Body = riak_cs_xml:to_xml([XmlDoc]),
respond(status_code(invalid_argument), Body, RD, Ctx).

copy_object_response(Manifest, RD, Ctx) ->
copy_response(Manifest, 'CopyObjectResult', RD, Ctx).

Expand Down
20 changes: 15 additions & 5 deletions src/riak_cs_utils.erl
Original file line number Diff line number Diff line change
Expand Up @@ -521,14 +521,24 @@ update_obj_value(Obj, Value) when is_binary(Value) ->
key_exists(RcPid, Bucket, Key) ->
key_exists_handle_get_manifests(riak_cs_manifest:get_manifests(RcPid, Bucket, Key)).

-spec big_end_key(non_neg_integer()) -> binary().
big_end_key(NumBytes) ->
MaxByte = <<255:8/integer>>,
iolist_to_binary([MaxByte || _ <- lists:seq(1, NumBytes)]).

-spec big_end_key() -> binary().
big_end_key() ->
big_end_key(128).
big_end_key(<<>>).

-spec big_end_key(Prefix::binary() | undefined) -> binary().
big_end_key(undefined) ->
big_end_key(<<>>);
big_end_key(Prefix) ->
Padding = case riak_cs_config:max_key_length() of
unlimited ->
<<>>;
MaxLen when byte_size(Prefix) > MaxLen ->
<<>>;
MaxLen ->
binary:copy(<<255>>, MaxLen - byte_size(Prefix))
end,
<<Prefix/binary, 255, Padding/binary>>.

%% @doc Return `stanchion' configuration data.
-spec stanchion_data() -> {string(), pos_integer(), boolean()}.
Expand Down
23 changes: 16 additions & 7 deletions src/riak_cs_wm_object.erl
Original file line number Diff line number Diff line change
Expand Up @@ -42,14 +42,23 @@ init(Ctx) ->
{ok, Ctx#context{local_context=#key_context{}}}.

-spec malformed_request(#wm_reqdata{}, #context{}) -> {false, #wm_reqdata{}, #context{}}.
malformed_request(RD, Ctx) ->
ContextWithKey = riak_cs_wm_utils:extract_key(RD, Ctx),
case riak_cs_wm_utils:has_canned_acl_and_header_grant(RD) of
true ->
riak_cs_s3_response:api_error(canned_acl_and_header_grant,
malformed_request(RD, #context{response_module=ResponseMod} = Ctx) ->
case riak_cs_wm_utils:extract_key(RD, Ctx) of
{error, Reason} ->
ResponseMod:api_error(Reason, RD, Ctx);
{ok, ContextWithKey} ->
case riak_cs_wm_utils:has_canned_acl_and_header_grant(RD) of
true ->
ResponseMod:api_error(canned_acl_and_header_grant,
RD, ContextWithKey);
false ->
{false, RD, ContextWithKey}
false ->
case riak_cs_copy_object:malformed_request(RD) of
{true, Reason} ->
ResponseMod:api_error(Reason, RD, ContextWithKey);
false ->
{false, RD, ContextWithKey}
end
end
end.

%% @doc Get the type of access requested and the manifest with the
Expand Down
13 changes: 8 additions & 5 deletions src/riak_cs_wm_object_acl.erl
Original file line number Diff line number Diff line change
Expand Up @@ -36,14 +36,17 @@ init(Ctx) ->
{ok, Ctx#context{local_context=#key_context{}}}.

-spec malformed_request(#wm_reqdata{}, #context{}) -> {false, #wm_reqdata{}, #context{}}.
malformed_request(RD,Ctx) ->
malformed_request(RD, #context{response_module=ResponseMod} = Ctx) ->
case riak_cs_wm_utils:has_acl_header_and_body(RD) of
true ->
riak_cs_s3_response:api_error(unexpected_content,
RD, Ctx);
ResponseMod:api_error(unexpected_content, RD, Ctx);
false ->
NewCtx = riak_cs_wm_utils:extract_key(RD, Ctx),
{false, RD, NewCtx}
case riak_cs_wm_utils:extract_key(RD, Ctx) of
{error, Reason} ->
ResponseMod:api_error(Reason, RD, Ctx);
{ok, NewCtx} ->
{false, RD, NewCtx}
end
end.

%% @doc Get the type of access requested and the manifest with the
Expand Down
18 changes: 11 additions & 7 deletions src/riak_cs_wm_object_upload.erl
Original file line number Diff line number Diff line change
Expand Up @@ -39,14 +39,18 @@ init(Ctx) ->
{ok, Ctx#context{local_context=#key_context{}}}.

-spec malformed_request(#wm_reqdata{}, #context{}) -> {false, #wm_reqdata{}, #context{}}.
malformed_request(RD,Ctx) ->
ContextWithKey = riak_cs_wm_utils:extract_key(RD, Ctx),
case riak_cs_wm_utils:has_canned_acl_and_header_grant(RD) of
true ->
riak_cs_s3_response:api_error(canned_acl_and_header_grant,
malformed_request(RD, #context{response_module=ResponseMod} = Ctx) ->
case riak_cs_wm_utils:extract_key(RD, Ctx) of
{error, Reason} ->
ResponseMod:api_error(Reason, RD, Ctx);
{ok, ContextWithKey} ->
case riak_cs_wm_utils:has_canned_acl_and_header_grant(RD) of
true ->
ResponseMod:api_error(canned_acl_and_header_grant,
RD, ContextWithKey);
false ->
{false, RD, ContextWithKey}
false ->
{false, RD, ContextWithKey}
end
end.

%% @doc Get the type of access requested and the manifest with the
Expand Down
14 changes: 9 additions & 5 deletions src/riak_cs_wm_object_upload_part.erl
Original file line number Diff line number Diff line change
Expand Up @@ -45,16 +45,20 @@ init(Ctx) ->
{ok, Ctx#context{local_context=#key_context{}}}.

-spec malformed_request(#wm_reqdata{}, #context{}) ->
{false, #wm_reqdata{}, #context{}} | {{halt, pos_integer()}, #wm_reqdata{}, #context{}}.
malformed_request(RD,Ctx) ->
{false, #wm_reqdata{}, #context{}} | {{halt, pos_integer()}, #wm_reqdata{}, #context{}}.
malformed_request(RD, #context{response_module=ResponseMod} = Ctx) ->
Method = wrq:method(RD),
case Method == 'PUT' andalso not valid_part_number(RD) of
%% For multipart upload part,
true ->
riak_cs_s3_response:api_error(invalid_part_number, RD, Ctx);
ResponseMod:api_error(invalid_part_number, RD, Ctx);
false ->
NewCtx = riak_cs_wm_utils:extract_key(RD, Ctx),
{false, RD, NewCtx}
case riak_cs_wm_utils:extract_key(RD, Ctx) of
{error, Reason} ->
ResponseMod:api_error(Reason, RD, Ctx);
{ok, ContextWithKey} ->
{false, RD, ContextWithKey}
end
end.

valid_part_number(RD) ->
Expand Down
21 changes: 14 additions & 7 deletions src/riak_cs_wm_utils.erl
Original file line number Diff line number Diff line change
Expand Up @@ -407,17 +407,24 @@ iso_8601_to_erl_date(Date) ->
{b2i(Hr), b2i(Mn), b2i(Sc)}}
end.

%% @doc Return a new context where the bucket and key for the s3 object
%% have been inserted.
-spec extract_key(#wm_reqdata{}, #context{}) -> #context{}.
%% @doc Return a new context where the bucket and key for the s3
%% object have been inserted. It also does key length check. TODO: do
%% we check if the key is valid Unicode string or not?
-spec extract_key(#wm_reqdata{}, #context{}) ->
{ok, #context{}} | {error, {key_too_long, pos_integer()}}.
extract_key(RD,Ctx=#context{local_context=LocalCtx0}) ->
Bucket = list_to_binary(wrq:path_info(bucket, RD)),
%% need to unquote twice since we re-urlencode the string during rewrite in
%% order to trick webmachine dispatching
Key = mochiweb_util:unquote(mochiweb_util:unquote(wrq:path_info(object, RD))),
LocalCtx = LocalCtx0#key_context{bucket=Bucket, key=Key},
Ctx#context{bucket=Bucket,
local_context=LocalCtx}.
MaxKeyLen = riak_cs_config:max_key_length(),
case mochiweb_util:unquote(mochiweb_util:unquote(wrq:path_info(object, RD))) of
Key when length(Key) =< MaxKeyLen ->
LocalCtx = LocalCtx0#key_context{bucket=Bucket, key=Key},
{ok, Ctx#context{bucket=Bucket,
local_context=LocalCtx}};
Key ->
{error, {key_too_long, length(Key)}}
end.

extract_name(User) when is_list(User) ->
User;
Expand Down
1 change: 1 addition & 0 deletions test/riak_cs_config_test.erl
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ default_config_test() ->
cuttlefish_unit:assert_not_configured(Config, "riak_cs.auth_module"),
cuttlefish_unit:assert_config(Config, "riak_cs.fold_objects_for_list_keys", true),
cuttlefish_unit:assert_config(Config, "riak_cs.max_buckets_per_user", 100),
cuttlefish_unit:assert_config(Config, "riak_cs.max_key_length", 1024),
cuttlefish_unit:assert_config(Config, "riak_cs.trust_x_forwarded_for", false),
cuttlefish_unit:assert_config(Config, "riak_cs.leeway_seconds", 86400),
cuttlefish_unit:assert_config(Config, "riak_cs.gc_interval", 900),
Expand Down
2 changes: 1 addition & 1 deletion test/riak_cs_s3_rewrite_eqc.erl
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ prop_s3_rewrite(Style) ->
%% <<"0o:", hash(Bucket)/binary>> and Key - The key should be exactly
%% same as the original one in the client-app before URL encoding.
Ctx = #context{local_context=#key_context{}},
#context{local_context=LocalCtx} = riak_cs_wm_utils:extract_key(RD, Ctx),
{ok, #context{local_context=LocalCtx}} = riak_cs_wm_utils:extract_key(RD, Ctx),

%% ?debugVal(CSKey),
{CSBucket, CSKey} =:=
Expand Down

0 comments on commit f7ada0f

Please sign in to comment.