Skip to content

Commit

Permalink
Merge pull request #1134 from basho/feature/offline-gc-tool-squashed
Browse files Browse the repository at this point in the history
Offline deletion tool that searches and collects stale blocks

Reviewed-by: shino
  • Loading branch information
borshop committed Apr 30, 2015
2 parents 06be181 + 885f839 commit 83a7eb2
Show file tree
Hide file tree
Showing 2 changed files with 372 additions and 0 deletions.
168 changes: 168 additions & 0 deletions priv/tools/internal/offline_delete.erl
Original file line number Diff line number Diff line change
@@ -0,0 +1,168 @@
#!/usr/bin/env escript

%% ---------------------------------------------------------------------
%%
%% Copyright (c) 2015 Basho Technologies, Inc. All Rights Reserved.
%%
%% This file is provided to you under the Apache License,
%% Version 2.0 (the "License"); you may not use this file
%% except in compliance with the License. You may obtain
%% a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing,
%% software distributed under the License is distributed on an
%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
%% KIND, either express or implied. See the License for the
%% specific language governing permissions and limitations
%% under the License.
%%
%% ---------------------------------------------------------------------

-module(offline_delete).

-compile(export_all).
-mode(compile).

%% @doc This is an offline deletion script that'll directly opens
%% bitcask files and reads some file where keys and partitions which
%% should be deleted are written, and then delete them, without
%% bothering KV.
%%
%% Note: make sure you remove AAE tree after this script was run, and
%% turn off AAE on other nodes that's running on the cluster.

main(["--dry-run", "--old-format", BitcaskDir, BlocksListFile]) ->
offline_delete(BitcaskDir, BlocksListFile, true, true);
main(["--dry-run", BitcaskDir, BlocksListFile]) ->
offline_delete(BitcaskDir, BlocksListFile, true, false);
main(["--old-format", BitcaskDir, BlocksListFile]) ->
offline_delete(BitcaskDir, BlocksListFile, false, true);
main([BitcaskDir, BlocksListFile]) ->
offline_delete(BitcaskDir, BlocksListFile, false, false);
main(_) ->
io:format(standard_error,
"options: [--dry-run] [--old-format] <BitcaskDir> <BlocksListFile>~n"
"\033[31m\033[1m[Caution] Make sure Riak is not running!!!\033[0m~n"
"It'd be better if all hinted handoff have been finished before stopping Riak.~n", []).

-spec open_all_bitcask(filename:filename()) ->
orddict:orddict(non_neg_integer(), reference()).
open_all_bitcask(BitcaskDir) ->
{ok, List} = file:list_dir(BitcaskDir),
Result = lists:map(fun(File) ->
Filename = filename:join(BitcaskDir, File),
case bitcask:open(Filename, [read_write]) of
Ref when is_reference(Ref) ->
{list_to_integer(File), Ref};
Other ->
error({File, Other})
end
end, List),
orddict:from_list(Result).

-spec close_all_bitcask(orddict:orddict(non_neg_integer(), reference())) -> ok.
close_all_bitcask(Bitcasks) ->
orddict:map(fun(_, Ref) ->
bitcask:close(Ref)
end, Bitcasks).

%% New bitcask 1.7 format (Riak 2.0 or later)
-define(VERSION_1, 1).
-define(VERSION_BYTE, ?VERSION_1).

make_sure(Dir) ->
io:format(standard_error,
"\033[31m[Warning]\033\[0m~n"
"Make sure any Riak process using '~s' is not running "
"or your data may corrupt.~n", [filename:absname(Dir)]),
"y\n" = io:get_line("Accept the terms of conditions? [y/N] ").

offline_delete(BitcaskDir, BlocksListFile, DryRun, OldFormat) ->
make_sure(BitcaskDir),
{ok, Fd} = file:open(BlocksListFile, [read]),
BC = open_all_bitcask(BitcaskDir),
io:format(standard_error, "~p bitcask directories at ~s opened.~n",
[length(BC), BitcaskDir]),
BKVersion = case OldFormat of
false -> ?VERSION_1;
true -> 0
end,
io:format(standard_error, "Using bitcask key version: ~p.~n",
[BKVersion]),
{ok, Deleted} = for_each_line(Fd, BC, DryRun, 0, BKVersion),
%% io:format(standard_error, "~p~n", [BC]),
io:format(standard_error, "~p blocks at ~s was deleted"
" (dry run: ~p).~n",
[Deleted, BitcaskDir, DryRun]),
close_all_bitcask(BC),
ok = file:close(Fd).

for_each_line(Fd, BC, DryRun, Count, BKVersion) ->
case Count rem 1000 of
500 ->
io:format(standard_error,
"~p blocks has been deleted.~n",
[Count]);
_ ->
noop
end,
case file:read_line(Fd) of
{ok, Line} ->
Tokens = string:tokens(Line, "\t \n"),
[V1, V2, V3, B, K, _UUIDStr, _SeqNo] = Tokens,
Bucket = mochihex:to_bin(B),
Key = mochihex:to_bin(K),
%% io:format("trying ~p~n", [{list_to_integer(V1),
%% list_to_integer(V2),
%% list_to_integer(V3),
%% UUIDStr,
%% list_to_integer(SeqNo)}]),
C0 = maybe_delete(BC, list_to_integer(V1), Bucket, Key, DryRun, BKVersion),
C1 = maybe_delete(BC, list_to_integer(V2), Bucket, Key, DryRun, BKVersion),
C2 = maybe_delete(BC, list_to_integer(V3), Bucket, Key, DryRun, BKVersion),
for_each_line(Fd, BC, DryRun, Count+C0+C1+C2, BKVersion);
eof ->
{ok, Count};
{error, Reason} ->
io:format(standard_error, "Error: ~p~n", Reason)
end.

maybe_delete(BC, Idx, Bucket, Key, DryRun, BKVersion) ->
case orddict:find(Idx, BC) of
{ok, Bitcask} ->
BitcaskKey = make_bk(BKVersion, Bucket, Key),
case (case DryRun of
true ->
bitcask:get(Bitcask, BitcaskKey);
false ->
bitcask:delete(Bitcask, BitcaskKey)
end) of
{ok, _Value} ->
1;
ok ->
1;
Error ->
io:format(standard_error, "error: ~p~n", [Error]),
0
end;
error ->
%% Key does not exist here. Ignore.
0
end.

%% Old bitcask format (Riak 1.4 or before)
make_bk(0, Bucket, Key) ->
term_to_binary({Bucket, Key});
%% New bitcask 1.7 format (Riak 2.0 or later)
make_bk(1, {Type, Bucket}, Key) ->
TypeSz = size(Type),
BucketSz = size(Bucket),
<<?VERSION_BYTE:7, 1:1, TypeSz:16/integer, Type/binary,
BucketSz:16/integer, Bucket/binary, Key/binary>>;
%% New bitcask 1.7 format (Riak 2.0 or later)
make_bk(1, Bucket, Key) ->
BucketSz = size(Bucket),
<<?VERSION_BYTE:7, 0:1, BucketSz:16/integer,
Bucket/binary, Key/binary>>.
204 changes: 204 additions & 0 deletions priv/tools/internal/select_gc_bucket.erl
Original file line number Diff line number Diff line change
@@ -0,0 +1,204 @@
%% #!/usr/bin/env escript

%% ---------------------------------------------------------------------
%%
%% Copyright (c) 2015 Basho Technologies, Inc. All Rights Reserved.
%%
%% This file is provided to you under the Apache License,
%% Version 2.0 (the "License"); you may not use this file
%% except in compliance with the License. You may obtain
%% a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing,
%% software distributed under the License is distributed on an
%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
%% KIND, either express or implied. See the License for the
%% specific language governing permissions and limitations
%% under the License.
%%
%% ---------------------------------------------------------------------

-module(select_gc_bucket).

-compile(export_all).
-mode(compile).

-include_lib("riak_cs/include/riak_cs.hrl").

-record(state,
{logger :: file:io_device(),
ring_size = 64 :: non_neg_integer(),
threshold :: non_neg_integer() | undefined,
keycount = 0 :: non_neg_integer(),
manifestcount = 0 :: non_neg_integer(),
total_manifestcount = 0 :: non_neg_integer(),
blockcount = 0 :: non_neg_integer()
}).

options() ->
[{host, $h, "host", {string, "localhost"}, "Address of Riak"},
{port, $p, "port", {integer, 8087}, "Port number of Riak PB"},
{ring_size, $r, "ring-size", {integer, 64}, "Ring Size"},
{threshold, $t, "threshold", {integer, 5*1024*1024}, "Threshold"},
{start, $s, "start", {string, "19700101"}, "Start of the seek period"},
{'end', $e, "end", {string, "yesterday"}, "End of the seek period"},
{output, $o, "output", {string, "/tmp/tmp.txt"},
"Output file (absolute path)"},
{timeout, $w, "timeout", {integer, 6}, "Timeout in seconds"}].

pgv(Key,Proplist) ->
case proplists:get_value(Key, Proplist) of
undefined -> getopt:usage(options(), "riak-cs escript me"), halt(-1);
Value -> Value
end.

maybe_date("today") ->
list_to_binary(integer_to_list(riak_cs_gc:timestamp()));
maybe_date("yesterday") ->
list_to_binary(integer_to_list(riak_cs_gc:timestamp() - 86400));
maybe_date([Y0,Y1,Y2,Y3,M0,M1,D0,D1]) ->
DateTime = {{list_to_integer([Y0,Y1,Y2,Y3]),
list_to_integer([M0,M1]),
list_to_integer([D0,D1])},
{0,0,0}},
Sec = calendar:datetime_to_gregorian_seconds(DateTime) - 62167219200,
list_to_binary(integer_to_list(Sec)).

main(Args) ->
case getopt:parse(options(), Args) of
{ok, {Options, _}} ->
Host = pgv(host, Options),
Port = pgv(port, Options),
RingSize = pgv(ring_size, Options),
Threshold = pgv(threshold, Options),
%% TODO: make this configurable, take leeway into account
StartKey = maybe_date(pgv(start, Options)),
EndKey = maybe_date(pgv('end', Options)),
OutputFile = pgv(output, Options),
Timeout = pgv(timeout, Options) * 1000,
State = #state{ring_size = RingSize, threshold = Threshold},
work(Host, Port, StartKey, EndKey, Timeout, OutputFile, State);
_E ->
getopt:usage(options(), "select_gc_bucket.erl")
end.

work(Host, Port, StartKey, EndKey, Timeout, OutputFile, State0) ->
io:format(standard_error, "Connecting ~p:~p~n", [Host, Port]),
Opts = [%% {max_results, 1000},
{start_key, StartKey},
{end_key, EndKey},
{timeout, Timeout}],
{ok, Pid} = riakc_pb_socket:start_link(Host, Port),
Options = [write, delayed_write], %, compressed],
{ok, File} = file:open(OutputFile, Options),
State = State0#state{logger=File},
try
{ok, ReqID} = riakc_pb_socket:cs_bucket_fold(Pid, ?GC_BUCKET, Opts),
handle_fold_results(Pid, ReqID, State),
io:format(standard_error,
"Finished!~n"
"Next action is to run offline delete. Use Riak command like this:~n"
"$ riak escript /usr/lib/riak-cs/lib/riak-cs_2.0.0/priv/tools/internal/offline_delete.erl --dry-run /var/lib/riak/bitcask ~s~n",
[OutputFile])
after
riakc_pb_socket:stop(Pid),
file:close(File)
end.

handle_fold_results(Pid, ReqID, State = #state{total_manifestcount=TMC,
keycount=KC,
manifestcount=MC,
blockcount=BC}) ->
receive
{ReqID, {ok, Objs}} ->
Nums = [begin
ManifestSet = riak_cs_gc:decode_and_merge_siblings(
Obj, twop_set:new()),
ManifestList = twop_set:to_list(ManifestSet),
{MK, MB} = lists:foldl(fun(Manifest, {MK0, MB0}) ->
{MK1, MB1} = handle_manifest(Manifest, State),
{MK0+MK1, MB0+MB1}
end, {0, 0}, ManifestList),
{length(ManifestList), MK, MB}
end
|| Obj <- Objs],
%% io:format(standard_error, "============================== ~p gc keys found.~n", [length(Objs0)]),
{A,B,C} = lists:foldl(fun({A0,B0,C0},{A1,B1,C1}) -> {A0+A1, B0+B1, C0+C1} end, {0, 0, 0}, Nums),
handle_fold_results(Pid, ReqID,
State#state{total_manifestcount=A+TMC,
keycount=KC+length(Objs),
manifestcount=MC+B,
blockcount=BC+C});
%% {ReqID, {done, Other}} when is_list(Other) ->
%% handle_fold_results(Pid, ReqID, Other, State);
{ReqID, {done, _}} ->
io:format(standard_error,
"keycount: ~p, total_manifestcount ~p, manifestcount ~p, blockcount ~p~n",
[KC, TMC, MC, BC]),
done;
Other ->
io:format(standard_error, "Boom!!! Other; ~p", [Other]),
error
end.

%% => {matched_keys, matched_blocks}
handle_manifest({_UUID,
?MANIFEST{content_length=ContentLength} = _Manifest},
#state{threshold=Threshold} = _State)
when ContentLength < Threshold ->
{0, 0};
handle_manifest({_UUID, ?MANIFEST{bkey=BKey={Bucket,_},
uuid=UUID,
content_length=ContentLength,
state=pending_delete} = M},
_State = #state{ring_size=RingSize,
logger=File}
) ->
io:format(standard_error, "~p (~p) ~p~n", [BKey, mochihex:to_hex(UUID), ContentLength]),
BlockSequences = riak_cs_lfs_utils:block_sequences_for_manifest(M),
Count = ordsets:fold(fun({UUID1, SeqNo}, Count0) ->
BK = {B,K} = full_bkey(Bucket, dummy, UUID1, SeqNo),
VNodes = [[integer_to_list(VNode), $\t]
|| VNode <- vnode_ids(BK, RingSize, 3)],
%% Partitions, UUID, SeqNo
file:write(File, [VNodes,
mochihex:to_hex(B), $\t,
mochihex:to_hex(K), $\t,
mochihex:to_hex(UUID1), $\t,
integer_to_list(SeqNo), $\n]),
Count0 + 1
end, 0, BlockSequences),
{1, Count}.

%% From riak_cs
full_bkey(Bucket, Key, UUID, BlockId) ->
PrefixedBucket = riak_cs_utils:to_bucket_name(blocks, Bucket),
FullKey = riak_cs_lfs_utils:block_name(Key, UUID, BlockId),
{PrefixedBucket, FullKey}.

-define(RINGTOP, trunc(math:pow(2,160)-1)). % SHA-1 space

%% (hash({B,K}) div Inc)

vnode_id(BKey, RingSize) ->
<<HashKey:160/integer>> = key_of(BKey),
Inc = ?RINGTOP div RingSize,
%% io:format(standard_error, "RingSize ~p, RINGTOP ~p Inc ~p ~n", [RingSize, ?RINGTOP, Inc]),
PartitionId = ((HashKey div Inc) + 1) rem RingSize,
PartitionId * Inc.

vnode_ids(BKey, RingSize, NVal) ->
<<HashKey:160/integer>> = key_of(BKey),
Inc = ?RINGTOP div RingSize,
%% io:format(standard_error, "RingSize ~p, RINGTOP ~p Inc ~p ~n", [RingSize, ?RINGTOP, Inc]),
PartitionId = ((HashKey div Inc) + 1) rem RingSize,
[((PartitionId+N) rem RingSize) * Inc || N <- lists:seq(0, NVal-1)].

%% From riak_core
sha(Bin) ->
crypto:hash(sha, Bin).

key_of(ObjectName) ->
sha(term_to_binary(ObjectName)).

0 comments on commit 83a7eb2

Please sign in to comment.