Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Dotted Version Vectors Sets #572

Closed
wants to merge 26 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
850584d
First working commit, with dotted version vectors.
ricardobcl Apr 10, 2012
cf0be54
Corrected few errors and all unit tests now work.
ricardobcl Apr 12, 2012
5b7040b
In reconcile/1, descends already tests if params are equal.
ricardobcl May 21, 2012
19666c4
More efficient reconcile/1, doing pair-wise syncs
ricardobcl May 23, 2012
554ead8
Merge with branch compactDVV
ricardobcl Nov 24, 2012
921f176
Correct version of riak_object.erl
ricardobcl Nov 24, 2012
2187ee1
Use my fork of riak_core with supoport to dotted version vectors
ricardobcl Nov 24, 2012
fcc0578
Script to run dstat
ricardobcl Nov 27, 2012
9487ff8
Merged with basho master branch on 30/11/2012
ricardobcl Nov 30, 2012
7c6f4d7
Deleted dstat_script.sh file
ricardobcl Nov 30, 2012
d82b865
Corrected duplicate riak_kv_index_req_v2 in riak_kv_vnode.hrl
ricardobcl Dec 1, 2012
94aaa7f
Delete extra files that were generated from mergetool
ricardobcl Dec 12, 2012
7574ed4
Deleted some commented files and updated some docs
ricardobcl Dec 14, 2012
70dd894
Support for Compact Version of DVV.
ricardobcl Jan 3, 2013
bfd1c04
Updated rebar to use the correct branch of riak_core.
ricardobcl Jan 3, 2013
ddcea0b
Corrected get_contents/1, obj_not_deleted/1 and simplified hash_objec…
ricardobcl Jan 4, 2013
e1492fe
Corrected a few bugs and unit tests now all pass.
ricardobcl Jan 9, 2013
e8e9ece
Corrected some bugs, namely set_vclock/2, reconcile/2, update_vclock/…
ricardobcl Jan 12, 2013
f6d961d
Use lww while comparing timestamps in metadata.
ricardobcl Jan 14, 2013
dd3ac7f
Support for new causal clock: dvvset.
ricardobcl Jan 25, 2013
9e96b6d
Correct bug in riak_object:set_vclock and changed riak_object:new_vcl…
ricardobcl Jan 25, 2013
33fefaf
Corrected syntax error in riak_object:set_vclock/2
ricardobcl Jan 28, 2013
f784200
Updated calls to functions from dvvset.erl in riak_object.erl
ricardobcl Feb 15, 2013
a337eaf
Corrected riak_object:equal_vclock.
ricardobcl Feb 20, 2013
457558e
Minor reafactoring to riak_object.erl
ricardobcl Feb 28, 2013
407d19c
removed some debug prints
ricardobcl Jun 5, 2013
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion rebar.config
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
]}.

{deps, [
{riak_core, ".*", {git, "git://github.com/basho/riak_core", "master"}},
{riak_core, ".*", {git, "git://github.com/ricardobcl/riak_core", "dvvset"}},
{erlang_js, ".*", {git, "git://github.com/basho/erlang_js", "master"}},
{bitcask, ".*", {git, "git://github.com/basho/bitcask", "master"}},
{merge_index, ".*", {git, "git://github.com/basho/merge_index",
Expand Down
6 changes: 3 additions & 3 deletions src/riak_client.erl
Original file line number Diff line number Diff line change
Expand Up @@ -254,7 +254,7 @@ delete(Bucket,Key,Options,Timeout) when is_list(Options) ->
delete(Bucket,Key,RW,Timeout) ->
delete(Bucket,Key,[{rw, RW}], Timeout).

%% @spec delete_vclock(riak_object:bucket(), riak_object:key(), vclock:vclock()) ->
%% @spec delete_vclock(riak_object:bucket(), riak_object:key(), dvvset:clock()) ->
%% ok |
%% {error, too_many_fails} |
%% {error, notfound} |
Expand All @@ -266,7 +266,7 @@ delete(Bucket,Key,RW,Timeout) ->
delete_vclock(Bucket,Key,VClock) ->
delete_vclock(Bucket,Key,VClock,[{rw,default}],?DEFAULT_TIMEOUT).

%% @spec delete_vclock(riak_object:bucket(), riak_object:key(), vclock::vclock(), RW :: integer()) ->
%% @spec delete_vclock(riak_object:bucket(), riak_object:key(), dvvset:clock(), RW :: integer()) ->
%% ok |
%% {error, too_many_fails} |
%% {error, notfound} |
Expand All @@ -280,7 +280,7 @@ delete_vclock(Bucket,Key,VClock,Options) when is_list(Options) ->
delete_vclock(Bucket,Key,VClock,RW) ->
delete_vclock(Bucket,Key,VClock,[{rw, RW}],?DEFAULT_TIMEOUT).

%% @spec delete_vclock(riak_object:bucket(), riak_object:key(), vclock:vclock(), RW :: integer(),
%% @spec delete_vclock(riak_object:bucket(), riak_object:key(), dvvset:clock(), RW :: integer(),
%% TimeoutMillisecs :: integer()) ->
%% ok |
%% {error, too_many_fails} |
Expand Down
2 changes: 1 addition & 1 deletion src/riak_index.erl
Original file line number Diff line number Diff line change
Expand Up @@ -400,7 +400,7 @@ parse_object_hook_test() ->
end,

?assertMatch(
{r_object, _, _, _, _, _, _},
{r_object, _, _, _, _, _},
F([
{<<"field_bin">>, <<"A">>},
{<<"field_int">>, <<"1">>}
Expand Down
4 changes: 1 addition & 3 deletions src/riak_kv_backup.erl
Original file line number Diff line number Diff line change
Expand Up @@ -154,12 +154,10 @@ read_and_restore_function(Client, BinTerm) ->
make_binary_bucket(Bucket, Key, OriginalObj) when is_atom(Bucket) ->
Bucket1 = list_to_binary(atom_to_list(Bucket)),
OriginalContents = riak_object:get_contents(OriginalObj),
OriginalVClock = riak_object:vclock(OriginalObj),

% We can't change the bucket name without creating a new object...
NewObj = riak_object:new(Bucket1, Key, placeholder),
NewObj1 = riak_object:set_contents(NewObj, OriginalContents),
_NewObj2 = riak_object:set_vclock(NewObj1, OriginalVClock);
_NewObj1 = riak_object:set_contents(NewObj, OriginalContents);

%% If the bucket name is a binary, just pass it on through...
make_binary_bucket(Bucket, _Key, Obj) when is_binary(Bucket) -> Obj.
Expand Down
8 changes: 4 additions & 4 deletions src/riak_kv_delete.erl
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ delete(ReqId,Bucket,Key,Options,Timeout,Client,ClientId,undefined) ->
case C:get(Bucket,Key,[{r,R},{pr,PR},{timeout,Timeout}]) of
{ok, OrigObj} ->
RemainingTime = Timeout - (riak_core_util:moment() - RealStartTime),
delete(ReqId,Bucket,Key,Options,RemainingTime,Client,ClientId,riak_object:vclock(OrigObj));
delete(ReqId,Bucket,Key,Options,RemainingTime,Client,ClientId,riak_object:get_vclock(OrigObj));
{error, notfound} ->
?DTRACE(?C_DELETE_INIT1, [-2], []),
Client ! {ReqId, {error, notfound}};
Expand All @@ -85,7 +85,7 @@ delete(ReqId,Bucket,Key,Options,Timeout,Client,ClientId,VClock) ->
{W, PW, DW} ->
Obj0 = riak_object:new(Bucket, Key, <<>>, dict:store(?MD_DELETED,
"true", dict:new())),
Tombstone = riak_object:set_vclock(Obj0, VClock),
Tombstone = riak_object:set_vclock(Obj0, VClock), %% same value as current Obj0
{ok,C} = riak:local_client(ClientId),
Reply = C:put(Tombstone, [{w,W},{pw,PW},{dw, DW},{timeout,Timeout}]),
Client ! {ReqId, Reply},
Expand Down Expand Up @@ -240,7 +240,7 @@ invalid_w_delete() ->
Key = <<"testkey">>,
Timeout = 60000,
riak_kv_delete_sup:start_delete(node(), [RequestId, Bucket, Key, [{w,W}],
Timeout, self(), undefined, vclock:fresh()]),
Timeout, self(), undefined, riak_object:new_vclock()]),
%% Wait for error response
receive
{_RequestId, Result} ->
Expand Down Expand Up @@ -275,7 +275,7 @@ invalid_pw_delete() ->
Key = <<"testkey">>,
Timeout = 60000,
riak_kv_delete_sup:start_delete(node(), [RequestId, Bucket, Key,
[{pw,PW}], Timeout, self(), undefined, vclock:fresh()]),
[{pw,PW}], Timeout, self(), undefined, riak_object:new_vclock()]),
%% Wait for error response
receive
{_RequestId, Result} ->
Expand Down
19 changes: 8 additions & 11 deletions src/riak_kv_encoding_migrate.erl
Original file line number Diff line number Diff line change
Expand Up @@ -248,9 +248,8 @@ decode_object(RO) ->
copy_object(RO, B, K) ->
{ok, RC} = riak:local_client(),
NO1 = riak_object:new(B, K, <<>>),
NO2 = riak_object:set_vclock(NO1, riak_object:vclock(RO)),
NO3 = riak_object:set_contents(NO2, riak_object:get_contents(RO)),
RC:put(NO3).
NO2 = riak_object:set_contents(NO1, riak_object:get_contents(RO)),
RC:put(NO2).

%% Force writes to fail to test failure behavior
precommit_fail(_) ->
Expand Down Expand Up @@ -308,19 +307,17 @@ test_migration() ->
{ok, []} = riak_kv_encoding_migrate:delete_migrated_objects(EObjs),
{not_needed, [], []} = riak_kv_encoding_migrate:check_cluster(),

C1 = riak_object:get_contents(O2),
V1 = riak_object:vclock(O2),
C1 = riak_object:get_md_values(O2),

C2 = riak_object:get_contents(O4),
V2 = riak_object:vclock(O4),
C2 = riak_object:get_md_values(O4),

{ok, MO1} = RC:get(<<"me@mine">>, <<"key">>),
nearly_equal_contents(C1, riak_object:get_contents(MO1)),
true = vclock:descends(riak_object:vclock(MO1), V1),
nearly_equal_contents(C1, riak_object:get_md_values(MO1)),
true = riak_object:descendant(MO1, O2),

{ok, MO2} = RC:get(<<"bucket">>, <<"key@">>),
nearly_equal_contents(C2, riak_object:get_contents(MO2)),
true = vclock:descends(riak_object:vclock(MO2), V2),
nearly_equal_contents(C2, riak_object:get_md_values(MO2)),
true = riak_object:descendant(MO2, O4),

%% Use precommit hook to test failure scenarios
O7 = riak_object:new(<<"fail">>, <<"key%40">>, <<"value">>),
Expand Down
8 changes: 2 additions & 6 deletions src/riak_kv_get_core.erl
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ response(GetCore = #getcore{r = R, num_ok = NumOk, num_notfound = NumNotFound,
ok ->
Merged; % {ok, MObj}
tombstone when DeletedVClock ->
{error, {deleted, riak_object:vclock(MObj)}};
{error, {deleted, riak_object:get_vclock(MObj)}};
_ -> % tombstone or notfound
{error, notfound}
end;
Expand Down Expand Up @@ -160,7 +160,7 @@ final_action(GetCore = #getcore{n = N, merged = Merged0, results = Results,
[];
_ -> % ok or tombstone
[{Idx, outofdate} || {Idx, {ok, RObj}} <- Results,
strict_descendant(MObj, RObj)] ++
riak_object:strict_descendant(MObj, RObj)] ++
[{Idx, notfound} || {Idx, {error, notfound}} <- Results]
end,
Action = case ReadRepairs of
Expand Down Expand Up @@ -203,10 +203,6 @@ info(#getcore{num_ok = NumOks, num_fail = NumFail, results = Results}) ->
%% Internal functions
%% ====================================================================

strict_descendant(O1, O2) ->
vclock:descends(riak_object:vclock(O1),riak_object:vclock(O2)) andalso
not vclock:descends(riak_object:vclock(O2),riak_object:vclock(O1)).

merge(Replies, AllowMult) ->
RObjs = [RObj || {_I, {ok, RObj}} <- Replies],
case RObjs of
Expand Down
4 changes: 2 additions & 2 deletions src/riak_kv_get_fsm.erl
Original file line number Diff line number Diff line change
Expand Up @@ -466,10 +466,10 @@ update_stats(_, #state{ bkey = {Bucket, _}, tracked_bucket = StatTracked, calcul
%% calling term_to_binary/1, but it should be easier on memory,
%% especially for objects with large values.
calculate_objsize(Bucket, Obj) ->
Contents = riak_object:get_contents(Obj),
Contents = riak_object:get_md_values(Obj),
size(Bucket) +
size(riak_object:key(Obj)) +
size(term_to_binary(riak_object:vclock(Obj))) +
size(term_to_binary(riak_object:get_vclock(Obj))) +
lists:sum([size(term_to_binary(MD)) + value_size(Value) || {MD, Value} <- Contents]).

value_size(Value) when is_binary(Value) -> size(Value);
Expand Down
6 changes: 1 addition & 5 deletions src/riak_kv_index_hashtree.erl
Original file line number Diff line number Diff line change
Expand Up @@ -358,11 +358,7 @@ load_built(#state{trees=Trees}) ->
%% Generate a hash value for a binary-encoded `riak_object'
-spec hash_object(riak_object_t2b()) -> binary().
hash_object(RObjBin) ->
%% Normalize the `riak_object' vector clock before hashing
RObj = binary_to_term(RObjBin),
Vclock = riak_object:vclock(RObj),
UpdObj = riak_object:set_vclock(RObj, lists:sort(Vclock)),
Hash = erlang:phash2(term_to_binary(UpdObj)),
Hash = erlang:phash2(RObjBin),
term_to_binary(Hash).

%% Fold over a given vnode's data, inserting each object into the appropriate
Expand Down
16 changes: 8 additions & 8 deletions src/riak_kv_pb_object.erl
Original file line number Diff line number Diff line change
Expand Up @@ -111,11 +111,11 @@ process(#rpbgetreq{bucket=B, key=K, r=R0, pr=PR0, notfound_ok=NFOk,
make_option(notfound_ok, NFOk) ++
make_option(basic_quorum, BQ)) of
{ok, O} ->
case erlify_rpbvc(VClock) == riak_object:vclock(O) of
case riak_object:equal_vclock(erlify_rpbvc(VClock),riak_object:get_vclock(O)) of
true ->
{reply, #rpbgetresp{unchanged = true}, State};
_ ->
Contents = riak_object:get_contents(O),
Contents = riak_object:get_md_values(O),
PbContent = case Head of
true ->
%% Remove all the 'value' fields from the contents
Expand All @@ -127,7 +127,7 @@ process(#rpbgetreq{bucket=B, key=K, r=R0, pr=PR0, notfound_ok=NFOk,
riak_pb_kv_codec:encode_contents(Contents)
end,
{reply, #rpbgetresp{content = PbContent,
vclock = pbify_rpbvc(riak_object:vclock(O))}, State}
vclock = pbify_rpbvc(riak_object:get_vclock(O))}, State}
end;
{error, {deleted, TombstoneVClock}} ->
%% Found a tombstone - return its vector clock so it can
Expand All @@ -146,7 +146,7 @@ process(#rpbputreq{bucket=B, key=K, vclock=PbVC,
{ok, _} when NoneMatch ->
{error, "match_found", State};
{ok, O} when NotMod ->
case erlify_rpbvc(PbVC) == riak_object:vclock(O) of
case erlify_rpbvc(PbVC) == riak_object:get_vclock(O) of
true ->
process(Req#rpbputreq{if_not_modified=undefined,
if_none_match=undefined},
Expand Down Expand Up @@ -203,7 +203,7 @@ process(#rpbputreq{bucket=B, key=K, vclock=PbVC, content=RpbContent,
ok ->
{reply, #rpbputresp{}, State};
{ok, Obj} ->
Contents = riak_object:get_contents(Obj),
Contents = riak_object:get_md_values(Obj),
PbContents = case ReturnHead of
true ->
%% Remove all the 'value' fields from the contents
Expand All @@ -215,7 +215,7 @@ process(#rpbputreq{bucket=B, key=K, vclock=PbVC, content=RpbContent,
riak_pb_kv_codec:encode_contents(Contents)
end,
PutResp = #rpbputresp{content = PbContents,
vclock = pbify_rpbvc(riak_object:vclock(Obj)),
vclock = pbify_rpbvc(riak_object:get_vclock(Obj)),
key = ReturnKey
},
{reply, PutResp, State};
Expand Down Expand Up @@ -289,9 +289,9 @@ make_option(K, V) ->

%% Convert a vector clock to erlang
erlify_rpbvc(undefined) ->
vclock:fresh();
riak_object:new_vclock();
erlify_rpbvc(<<>>) ->
vclock:fresh();
riak_object:new_vclock();
erlify_rpbvc(PbVc) ->
binary_to_term(zlib:unzip(PbVc)).

Expand Down
2 changes: 1 addition & 1 deletion src/riak_kv_util.erl
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ is_x_deleted(Obj) ->
%% deleted. Return is the atom 'undefined' if all contents
%% are marked deleted, or the input Obj if any of them are not.
obj_not_deleted(Obj) ->
case [{M, V} || {M, V} <- riak_object:get_contents(Obj),
case [{M, V} || {M, V} <- riak_object:get_md_values(Obj),
dict:is_key(<<"X-Riak-Deleted">>, M) =:= false] of
[] -> undefined;
_ -> Obj
Expand Down
70 changes: 15 additions & 55 deletions src/riak_kv_vnode.erl
Original file line number Diff line number Diff line change
Expand Up @@ -728,7 +728,7 @@ do_put(Sender, {Bucket,_Key}=BKey, RObj, ReqID, StartTime, Options, State) ->
PruneTime = StartTime
end,
Coord = proplists:get_value(coord, Options, false),
PutArgs = #putargs{returnbody=proplists:get_value(returnbody,Options,false) orelse Coord,
PutArgs = #putargs{returnbody=Coord orelse proplists:get_value(returnbody,Options,false),
coord=Coord,
lww=proplists:get_value(last_write_wins, BProps, false),
bkey=BKey,
Expand Down Expand Up @@ -795,7 +795,7 @@ prepare_put(#state{vnodeid=VId,
coord=Coord,
lww=LWW,
starttime=StartTime,
prunetime=PruneTime},
prunetime=_PruneTime},
IndexBackend) ->
case Mod:get(Bucket, Key, ModState) of
{error, not_found, _UpdModState} ->
Expand All @@ -818,7 +818,6 @@ prepare_put(#state{vnodeid=VId,
{oldobj, OldObj1} ->
{{false, OldObj1}, PutArgs};
{newobj, NewObj} ->
VC = riak_object:vclock(NewObj),
AMObj = enforce_allow_mult(NewObj, BProps),
case IndexBackend of
true ->
Expand All @@ -828,17 +827,7 @@ prepare_put(#state{vnodeid=VId,
false ->
IndexSpecs = []
end,
case PruneTime of
undefined ->
ObjToStore = AMObj;
_ ->
ObjToStore =
riak_object:set_vclock(AMObj,
vclock:prune(VC,
PruneTime,
BProps))
end,
{{true, ObjToStore},
{{true, AMObj},
PutArgs#putargs{index_specs=IndexSpecs, is_index=IndexBackend}}
end
end.
Expand Down Expand Up @@ -883,54 +872,25 @@ enforce_allow_mult(Obj, BProps) ->
case proplists:get_value(allow_mult, BProps) of
true -> Obj;
_ ->
case riak_object:get_contents(Obj) of
[_] -> Obj;
Mult ->
{MD, V} = select_newest_content(Mult),
riak_object:set_contents(Obj, [{MD, V}])
case riak_object:value_count(Obj) of
1 -> Obj;
_ ->
riak_object:set_lww(Obj)
end
end.

%% @private
%% choose the latest content to store for the allow_mult=false case
select_newest_content(Mult) ->
hd(lists:sort(
fun({MD0, _}, {MD1, _}) ->
riak_core_util:compare_dates(
dict:fetch(<<"X-Riak-Last-Modified">>, MD0),
dict:fetch(<<"X-Riak-Last-Modified">>, MD1))
end,
Mult)).

%% @private
put_merge(false, true, _CurObj, UpdObj, _VId, _StartTime) -> % coord=false, LWW=true
{newobj, UpdObj};
put_merge(false, false, CurObj, UpdObj, _VId, _StartTime) -> % coord=false, LWW=false
ResObj = riak_object:syntactic_merge(CurObj, UpdObj),
case ResObj =:= CurObj of
true ->
{oldobj, CurObj};
false ->
{newobj, ResObj}
end;
put_merge(true, true, _CurObj, UpdObj, VId, StartTime) -> % coord=false, LWW=true
{newobj, riak_object:syntactic_merge(CurObj, UpdObj)};
put_merge(true, true, _CurObj, UpdObj, VId, StartTime) -> % coord=true, LWW=true
{newobj, riak_object:increment_vclock(UpdObj, VId, StartTime)};
put_merge(true, false, CurObj, UpdObj, VId, StartTime) ->
UpdObj1 = riak_object:increment_vclock(UpdObj, VId, StartTime),
UpdVC = riak_object:vclock(UpdObj1),
CurVC = riak_object:vclock(CurObj),

%% Check the coord put will replace the existing object
case vclock:get_counter(VId, UpdVC) > vclock:get_counter(VId, CurVC) andalso
vclock:descends(CurVC, UpdVC) == false andalso
vclock:descends(UpdVC, CurVC) == true of
true ->
{newobj, UpdObj1};
false ->
%% If not, make sure it does
{newobj, riak_object:increment_vclock(
riak_object:merge(CurObj, UpdObj1), VId, StartTime)}
end.
CurObj1 = riak_object:apply_updates(CurObj),
UpdObj1 = riak_object:apply_updates(UpdObj),
ResObj = riak_object:update_vclock(UpdObj1, CurObj1, VId, StartTime),
{newobj, ResObj}.

%% @private
do_get(_Sender, BKey, ReqID,
Expand Down Expand Up @@ -1113,8 +1073,8 @@ do_get_vclocks(KeyList,_State=#state{mod=Mod,modstate=ModState}) ->
%% @private
do_get_vclock({Bucket, Key}, Mod, ModState) ->
case Mod:get(Bucket, Key, ModState) of
{error, not_found, _UpdModState} -> vclock:fresh();
{ok, Val, _UpdModState} -> riak_object:vclock(binary_to_term(Val))
{error, not_found, _UpdModState} -> riak_object:new_vclock();
{ok, Val, _UpdModState} -> riak_object:get_vclock(binary_to_term(Val))
end.

%% @private
Expand Down
2 changes: 1 addition & 1 deletion src/riak_kv_wm_link_walker.erl
Original file line number Diff line number Diff line change
Expand Up @@ -403,7 +403,7 @@ multipart_encode_body(NestedResults, Ctx) when is_list(NestedResults) ->
multipart_encode_body(RiakObject, Ctx) ->
APIVersion = Ctx#ctx.api_version,
Prefix = Ctx#ctx.prefix,
[{MD, V}|Rest] = riak_object:get_contents(RiakObject),
[{MD, V}|Rest] = riak_object:get_md_values(RiakObject),
{VHead, Vclock} = riak_kv_wm_utils:vclock_header(RiakObject),
[VHead,": ",Vclock,"\r\n",

Expand Down
Loading