Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add audit tool to extract inconsistencies between users and buckets #1202

Merged
merged 4 commits into from
Jul 31, 2015
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion rel/files/riak-cs-admin
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ cd $RUNNER_BASE_DIR
SCRIPT=`basename $0`

usage() {
echo "Usage: $SCRIPT { status | gc | access | storage | stanchion | cluster-info | cleanup-orphan-multipart }"
echo "Usage: $SCRIPT { status | gc | access | storage | stanchion | cluster-info | cleanup-orphan-multipart | audit-bucket-ownership}"
}

# Check the first argument for instructions
Expand Down Expand Up @@ -107,6 +107,11 @@ case "$1" in

$NODETOOL rpc riak_cs_console cleanup_orphan_multipart "$@"
;;
audit[_-]bucket[_-]ownership)
shift
node_up_check
$NODETOOL rpc riak_cs_console audit_bucket_ownership "$@"
;;
*)
usage
exit 1
Expand Down
11 changes: 10 additions & 1 deletion src/riak_cs_bucket.erl
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,8 @@
update_bucket_record/1,
delete_all_uploads/2,
delete_old_uploads/3,
fold_all_buckets/3
fold_all_buckets/3,
fetch_bucket_keys/1
]).

-include("riak_cs.hrl").
Expand Down Expand Up @@ -778,3 +779,11 @@ update_user_buckets(User, Bucket) ->
{ok, ignore}
end
end.

%% @doc Grab the whole list of Riak CS bucket keys.
-spec fetch_bucket_keys(riak_client()) -> {ok, [binary()]} | {error, term()}.
fetch_bucket_keys(RcPid) ->
{ok, MasterPbc} = riak_cs_riak_client:master_pbc(RcPid),
Timeout = riak_cs_config:list_keys_list_buckets_timeout(),
riak_cs_pbc:list_keys(MasterPbc, ?BUCKETS_BUCKET, Timeout,
[riakc, list_all_bucket_keys]).
2 changes: 2 additions & 0 deletions src/riak_cs_config.erl
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@
delete_gckey_timeout/0,
list_keys_list_objects_timeout/0,
list_keys_list_users_timeout/0,
list_keys_list_buckets_timeout/0,
storage_calc_timeout/0,
list_objects_timeout/0, %% using mapred (v0)
fold_objects_timeout/0, %% for cs_bucket_fold
Expand Down Expand Up @@ -466,6 +467,7 @@ local_get_block_timeout() ->
?TIMEOUT_CONFIG_FUNC(delete_gckey_timeout).
?TIMEOUT_CONFIG_FUNC(list_keys_list_objects_timeout).
?TIMEOUT_CONFIG_FUNC(list_keys_list_users_timeout).
?TIMEOUT_CONFIG_FUNC(list_keys_list_buckets_timeout).
?TIMEOUT_CONFIG_FUNC(storage_calc_timeout).
?TIMEOUT_CONFIG_FUNC(list_objects_timeout).
?TIMEOUT_CONFIG_FUNC(fold_objects_timeout).
Expand Down
83 changes: 83 additions & 0 deletions src/riak_cs_console.erl
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
-export([
status/1,
cluster_info/1,
audit_bucket_ownership/1,
cleanup_orphan_multipart/0,
cleanup_orphan_multipart/1
]).
Expand Down Expand Up @@ -62,6 +63,88 @@ cluster_info([OutFile]) ->
error
end.

%% @doc Audit bucket ownership by comparing information between
%% users bucket and buckets bucket.
audit_bucket_ownership(_Args) when is_list(_Args) ->
audit_bucket_ownership().

audit_bucket_ownership() ->
{ok, RcPid} = riak_cs_riak_client:start_link([]),
try audit_bucket_ownership0(RcPid) of
[] ->
io:format("No Inconsistencies found.~n");
Inconsistencies ->
io:format("~p.~n", [Inconsistencies])
after
riak_cs_riak_client:stop(RcPid)
end.

%% Construct two sets of {User, Bucket} tuples and
%% return differences of subtraction in both directions, also output logs.
audit_bucket_ownership0(RcPid) ->
FromUsers = ownership_from_users(RcPid),
{FromBuckets, OwnedBy} = ownership_from_buckets(RcPid),
lager:debug("FromUsers: ~p~n", [lists:usort(gb_sets:to_list(FromUsers))]),
lager:debug("FromBuckets: ~p~n", [lists:usort(gb_sets:to_list(FromBuckets))]),
lager:debug("OwnedBy: ~p~n", [lists:usort(gb_trees:to_list(OwnedBy))]),
Inconsistencies0 =
gb_sets:fold(
fun ({U, B}, Acc) ->
lager:info(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would also be nice to output console.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

By discussion in chat, output to console may be very noisy when large number of inconsistencies.

"Bucket is not tracked by user: {User, Bucket} = {~s, ~s}",
[U, B]),
[{not_tracked, {U, B}} | Acc]
end, [], gb_sets:subtract(FromBuckets, FromUsers)),
gb_sets:fold(
fun({U,B}, Acc) ->
case gb_trees:lookup(B, OwnedBy) of
none ->
lager:info(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ditto.

"Bucket does not exist: {User, Bucket} = {~s, ~s}", [U, B]),
[{no_bucket_object, {U, B}} | Acc];
{value, DifferentUser} ->
lager:info(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ditto.

"Bucket is owned by different user: {User, Bucket, DifferentUser} = "
"{~s, ~s, ~s}", [U, B, DifferentUser]),
[{different_user, {U, B, DifferentUser}} | Acc]
end
end, Inconsistencies0, gb_sets:subtract(FromUsers, FromBuckets)).

ownership_from_users(RcPid) ->
{ok, UserKeys} = riak_cs_user:fetch_user_keys(RcPid),
lists:foldl(
fun(UserKey, Ownership) ->
UserStr = binary_to_list(UserKey),
{ok, {RCSUser, _Obj}} = riak_cs_user:get_user(UserStr, RcPid),
?RCS_USER{buckets=Buckets} = RCSUser,
lists:foldl(
fun(?RCS_BUCKET{name=BucketStr}, InnerOwnership) ->
gb_sets:add_element({UserKey, list_to_binary(BucketStr)},
InnerOwnership)
end, Ownership, Buckets)
end, gb_sets:new(), UserKeys).

ownership_from_buckets(RcPid) ->
{ok, BucketKeys} = riak_cs_bucket:fetch_bucket_keys(RcPid),
lists:foldl(
fun(BucketKey, {Ownership, OwnedBy}) ->
case riak_cs_bucket:fetch_bucket_object(BucketKey, RcPid) of
{error, no_such_bucket} ->
{Ownership, OwnedBy};
{ok, BucketObj} ->
OwnerIds = riakc_obj:get_values(BucketObj),
%% Raise badmatch when something wrong, too sloppy?
{ok, Owner} =
case lists:usort(OwnerIds) of
[Uniq] -> {ok, Uniq};
[] -> {error, {no_bucket_owner, BucketKey}};
Owners -> {error, {multiple_owners, BucketKey, Owners}}
end,
{gb_sets:add_element({Owner, BucketKey}, Ownership),
gb_trees:enter(BucketKey, Owner, OwnedBy)}
end
end, {gb_sets:new(), gb_trees:empty()}, BucketKeys).

%% @doc This function is for operation, esp cleaning up multipart
%% uploads which not completed nor aborted, and after that - bucket
%% deleted. Due to riak_cs/#475, this had been possible and Riak CS
Expand Down
1 change: 1 addition & 0 deletions src/riak_cs_stats.erl
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,7 @@ duration_metrics() ->
[riakc, fold_manifest_objs],
[riakc, mapred_storage],
[riakc, list_all_user_keys],
[riakc, list_all_bucket_keys],
[riakc, list_all_manifest_keys],
[riakc, list_users_receive_chunk],
[riakc, get_uploads_by_index],
Expand Down
24 changes: 9 additions & 15 deletions src/riak_cs_storage_d.erl
Original file line number Diff line number Diff line change
Expand Up @@ -348,7 +348,15 @@ start_batch(Options, Time, State) ->
%% socket process, so "starting" one here is the same as opening a
%% connection, and avoids duplicating the configuration lookup code
{ok, RcPid} = riak_cs_riak_client:start_link([]),
Batch = fetch_user_list(RcPid),
Batch =
case riak_cs_user:fetch_user_keys(RcPid) of
{ok, UserKeys} -> UserKeys;
{error, Error} ->
_ = lager:error("Storage calculator was unable"
" to fetch list of users (~p)",
[Error]),
[]
end,

gen_fsm:send_event(?SERVER, continue),
State#state{batch_start=BatchStart,
Expand All @@ -361,20 +369,6 @@ start_batch(Options, Time, State) ->
detailed=Detailed,
leeway_edge=LeewayEdge}.

%% @doc Grab the whole list of Riak CS users.
fetch_user_list(RcPid) ->
{ok, MasterPbc} = riak_cs_riak_client:master_pbc(RcPid),
Timeout = riak_cs_config:list_keys_list_users_timeout(),
case riak_cs_pbc:list_keys(MasterPbc, ?USER_BUCKET, Timeout,
[riakc, list_all_user_keys]) of
{ok, Users} -> Users;
{error, Error} ->
_ = lager:error("Storage calculator was unable"
" to fetch list of users (~p)",
[Error]),
[]
end.

%% @doc Compute storage for the next user in the batch.
calculate_next_user(#state{riak_client=RcPid,
batch=[User|Rest],
Expand Down
11 changes: 10 additions & 1 deletion src/riak_cs_user.erl
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,8 @@
save_user/3,
update_key_secret/1,
update_user/3,
key_id/1
key_id/1,
fetch_user_keys/1
]).

-include("riak_cs.hrl").
Expand Down Expand Up @@ -236,6 +237,14 @@ display_name(Email) ->
Index = string:chr(Email, $@),
string:sub_string(Email, 1, Index-1).

%% @doc Grab the whole list of Riak CS user keys.
-spec fetch_user_keys(riak_client()) -> {ok, [binary()]} | {error, term()}.
fetch_user_keys(RcPid) ->
{ok, MasterPbc} = riak_cs_riak_client:master_pbc(RcPid),
Timeout = riak_cs_config:list_keys_list_users_timeout(),
riak_cs_pbc:list_keys(MasterPbc, ?USER_BUCKET, Timeout,
[riakc, list_all_user_keys]).

%% ===================================================================
%% Internal functions
%% ===================================================================
Expand Down