Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix log divergence bug #508

Draft
wants to merge 3 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
234 changes: 141 additions & 93 deletions src/ra_server.erl
Original file line number Diff line number Diff line change
Expand Up @@ -522,58 +522,60 @@ handle_leader({PeerId, #append_entries_reply{success = false}},
[LogId, PeerId]),
{leader, State0, []};
handle_leader({PeerId, #append_entries_reply{success = false,
next_index = NextIdx,
last_index = LastIdx,
last_term = LastTerm}},
State0 = #{cfg := #cfg{log_id = LogId} = Cfg,
cluster := Nodes, log := Log0}) ->
next_index = PeerNextIdx,
last_index = PeerLastIdx,
last_term = PEER_LAST_TERM}},
#{cfg := #cfg{log_id = LogId} = Cfg,
cluster := Nodes, log := Log0} = State0) ->
ok = incr_counter(Cfg, ?C_RA_SRV_AER_REPLIES_FAILED, 1),
#{PeerId := Peer0 = #{match_index := MI,
next_index := NI}} = Nodes,
#{PeerId := #{match_index := MI,
next_index := NI} = Peer0} = Nodes,
% if the last_index exists and has a matching term we can forward
% match_index and update next_index directly
{Peer, Log} = case ra_log:fetch_term(LastIdx, Log0) of
{undefined, L} ->
% entry was not found - simply set next index to
{Peer, Log} = case ra_log:fetch_term(PeerLastIdx, Log0) of
{undefined, Log1} ->
% entry was not found
?DEBUG("~ts: setting next index for ~w ~b",
[LogId, PeerId, NextIdx]),
%% TODO: match index should not be set here surely??
{Peer0#{match_index => LastIdx,
next_index => NextIdx}, L};
[LogId, PeerId, PeerNextIdx]),
{Peer0#{next_index => PeerNextIdx}, Log1};
% entry exists we can forward
{LastTerm, L} when LastIdx >= MI ->
{PEER_LAST_TERM = Term, Log1}
when is_integer(Term) andalso
PeerLastIdx >= MI ->
?DEBUG("~ts: setting last index to ~b, "
" next_index ~b for ~w",
[LogId, LastIdx, NextIdx, PeerId]),
{Peer0#{match_index => LastIdx,
next_index => NextIdx}, L};
{_Term, L} when LastIdx < MI ->
[LogId, PeerLastIdx, PeerNextIdx, PeerId]),
{Peer0#{match_index => PeerLastIdx,
next_index => PeerNextIdx}, Log1};
{_OtherTerm, Log1}
when PeerLastIdx < MI ->
% TODO: this can only really happen when peers are
% non-persistent.
% should they turn-into non-voters when this sitution
% is detected
% Should they turn-into non-voters when this sitution
% is detected??
?WARN("~ts: leader saw peer with last_index [~b in ~b]"
" lower than recorded match index [~b]."
"Resetting peer's state to last_index.",
[LogId, LastIdx, LastTerm, MI]),
{Peer0#{match_index => LastIdx,
next_index => LastIdx + 1}, L};
{EntryTerm, L} ->
NextIndex = max(min(NI-1, LastIdx), MI),
[LogId, PeerLastIdx, PEER_LAST_TERM, MI]),
{Peer0#{match_index => PeerLastIdx,
next_index => PeerLastIdx + 1}, Log1};
{EntryTerm, Log1} ->
% last_index has a different term or entry does not
% exist
% The peer must have uncommitted entries from a prior
% term that have now been overwritten by the current
% leader.
% Decrement next_index but don't go lower than
% match index.
NextIndex = max(min(NI-1, PeerNextIdx), MI),
?DEBUG("~ts: leader received last_index ~b"
" from ~w with term ~b "
"- expected term ~b. Setting "
"next_index to ~b",
[LogId, LastIdx, PeerId, LastTerm, EntryTerm,
[LogId, PeerLastIdx, PeerId,
PEER_LAST_TERM, EntryTerm,
NextIndex]),
% last_index has a different term or entry does not
% exist
% The peer must have received an entry from a previous
% leader and the current leader wrote a different
% entry at the same index in a different term.
% decrement next_index but don't go lower than
% match index.
{Peer0#{next_index => NextIndex}, L}
{Peer0#{next_index => NextIndex}, Log1}
end,
State1 = State0#{cluster => Nodes#{PeerId => Peer}, log => Log},
{State, _, Effects} = make_pipelined_rpc_effects(State1, []),
Expand Down Expand Up @@ -1166,58 +1168,89 @@ handle_follower(#append_entries_rpc{term = Term,
prev_log_index = PLIdx,
prev_log_term = PLTerm,
entries = Entries0},
State00 = #{cfg := #cfg{log_id = LogId,
id = Id} = Cfg,
log := Log00,
commit_index := CommitIndex,
current_term := CurTerm})
#{cfg := #cfg{log_id = LogId,
id = Id} = Cfg,
log := Log00,
last_applied := LastApplied,
current_term := CurTerm} = State00)
when Term >= CurTerm ->
ok = incr_counter(Cfg, ?C_RA_SRV_AER_RECEIVED_FOLLOWER, 1),
ok = put_counter(Cfg, ?C_RA_SVR_METRIC_COMMIT_INDEX, LeaderCommit),
%% this is a valid leader, append entries message
Effects0 = [{record_leader_msg, LeaderId}],
State0 = update_term(Term, State00#{leader_id => LeaderId,
commit_index => LeaderCommit}),
State0 = update_term(Term, State00#{leader_id => LeaderId}),
case has_log_entry_or_snapshot(PLIdx, PLTerm, Log00) of
{entry_ok, Log0} ->
% filter entries already seen
{Log1, Entries} = drop_existing({Log0, Entries0}),
{Log1, Entries, LastValidIdx} =
drop_existing(Log0, Entries0, PLIdx),
case Entries of
[] ->
%% all entries have alrady been written
ok = incr_counter(Cfg, ?C_RA_SRV_AER_RECEIVED_FOLLOWER_EMPTY, 1),
{LastIdx, _} = ra_log:last_index_term(Log1),
Log2 = case Entries0 of
[] when LastIdx > PLIdx ->
%% if no entries were sent we need to reset
%% last index to match the leader
?DEBUG("~ts: resetting last index to ~b from ~b "
"in term ~b",
[LogId, PLIdx, LastIdx, Term]),
?assertNot(PLIdx < CommitIndex),
{ok, L} = ra_log:set_last_index(PLIdx, Log1),
L;
_ ->
Log1
end,
%% if nothing was appended we need to send a reply here
State1 = State0#{log => Log2},
% evaluate commit index as we may have received an updated
% commit_index for previously written entries
{NextState, State, Effects} =
evaluate_commit_index_follower(State1, Effects0),
% TODO: only send a reply if there is no pending write
% between the follower and the wal as the written event
% will trigger a reply anyway
Reply = append_entries_reply(Term, true, State),
{NextState, State,
[cast_reply(Id, LeaderId, Reply) | Effects]};
_ ->
State1 = lists:foldl(fun pre_append_log_follower/2,
State0, Entries),
{LocalLastIdx, _} = ra_log:last_index_term(Log1),
{LogIsValidated, Log2} =
case Entries0 of
[] when LocalLastIdx > PLIdx ->
%% if no entries were sent we need to reset
%% last index to match the leader
?DEBUG("~ts: resetting last index to ~b from ~b "
"in term ~b",
[LogId, PLIdx, LocalLastIdx, Term]),
?assertNot(PLIdx < LastApplied),
{ok, L} = ra_log:set_last_index(PLIdx, Log1),
%% a reset means we've validated the last index
{true, L};
_ ->
%% local last index is lower or equal to the
%% last validated index
%% in this case we can take the updated leader
%% commit as the end of the local log has been
%% validated
{LocalLastIdx =< LastValidIdx, Log1}
end,
case LogIsValidated of
true ->
State1 = State0#{log => Log2,
commit_index => LeaderCommit},
ok = put_counter(Cfg, ?C_RA_SVR_METRIC_COMMIT_INDEX,
LeaderCommit),
%% evaluate commit index as we may have received
%% an updated commit_index for previously
%% written entries
{NextState, State, Effects} =
evaluate_commit_index_follower(State1, Effects0),
%% log is validated so send a successful reply
Reply = append_entries_reply(Term, true, State),
{NextState, State,
[cast_reply(Id, LeaderId, Reply) | Effects]};
false ->
%% TODO: we need to ensure we make progress in case
%% the last applied index is lower than the last
%% valid index
LastValidatedIdx = max(LastApplied, LastValidIdx),
?DEBUG("~ts: append_entries_rpc with last index ~b "
" including ~b entries did not validate local log. "
"Requesting resend from index ~b",
[LogId, PLIdx, length(Entries0),
LastValidatedIdx + 1]),
{Reply, State} =
mismatch_append_entries_reply(Term, LastValidatedIdx,
State0#{log => Log2}),
{follower, State,
[cast_reply(Id, LeaderId, Reply)]}
end;
[{FstIdx, _, _} | _] ->
State1 = State0#{commit_index => LeaderCommit},
ok = put_counter(Cfg, ?C_RA_SVR_METRIC_COMMIT_INDEX,
LeaderCommit),
%% assert we're not writing below the last applied index
?assertNot(FstIdx < LastApplied),
State2 = lists:foldl(fun pre_append_log_follower/2,
State1, Entries),
case ra_log:write(Entries, Log1) of
{ok, Log2} ->
{NextState, State, Effects} =
evaluate_commit_index_follower(State1#{log => Log2},
evaluate_commit_index_follower(State2#{log => Log2},
Effects0),
{NextState, State, Effects};
{error, wal_down} ->
Expand All @@ -1230,12 +1263,10 @@ handle_follower(#append_entries_rpc{term = Term,
%% alternative where the WAL writes the last index, term
%% it wrote for each UID into an ETS table and query
%% this.
Log = Log1,
{await_condition,
State1#{log => Log,
State2#{log => Log1,
condition =>
#{predicate_fun => fun wal_down_condition/2}
},
#{predicate_fun => fun wal_down_condition/2}},
Effects0};
{error, _} = Err ->
exit(Err)
Expand Down Expand Up @@ -1268,8 +1299,8 @@ handle_follower(#append_entries_rpc{term = Term,
% that also has made progress
% As the follower is responsible for telling the leader
% which their next expected entry is the best we can do here
% is rewind back and use the commit index as the last index
% and commit_index + 1 as the next expected.
% is rewind back and use the last applied as the last index
% and last applied + 1 as the next expected.
% This _may_ overwrite some valid entries but is probably the
% simplest way to proceed
{Reply, State} = mismatch_append_entries_reply(Term, LastApplied,
Expand Down Expand Up @@ -1424,6 +1455,23 @@ handle_follower(#install_snapshot_rpc{term = Term,
{receive_snapshot, update_term(Term, State0#{log => Log,
leader_id => LeaderId}),
[{next_event, Rpc}, {record_leader_msg, LeaderId}]};
handle_follower(#install_snapshot_rpc{term = Term,
meta = #{index := LastIndex,
machine_version := SnapMacVer,
term := LastTerm}},
#{cfg := #cfg{log_id = LogId,
machine_version = MacVer},
current_term := CurTerm} = State)
when MacVer >= SnapMacVer ->
?DEBUG("~ts: install_snapshot received with lower last index ~b in ~b",
[LogId, LastIndex, Term]),
% follower receives a snapshot for an index lower than its last applied
% index, just reply to trigger the leader to send the append entries
% rpc at which point negotiation on the next index to send begins
Reply = #install_snapshot_result{term = CurTerm,
last_term = LastTerm,
last_index = LastIndex},
{follower, State, [{reply, Reply}]};
handle_follower(#request_vote_result{}, State) ->
%% handle to avoid logging as unhandled
{follower, State, []};
Expand Down Expand Up @@ -3066,17 +3114,17 @@ append_cluster_change(Cluster, From, ReplyMode,
{not_appended, wal_down, State, Effects}
end.

mismatch_append_entries_reply(Term, CommitIndex, State0) ->
{CITerm, State} = fetch_term(CommitIndex, State0),
% assert CITerm is found
false = CITerm =:= undefined,
mismatch_append_entries_reply(Term, LastAppliedIdx, State0) ->
{LATerm, State} = fetch_term(LastAppliedIdx, State0),
% assert LATerm is found
?assert(LATerm =/= undefined),
{#append_entries_reply{term = Term, success = false,
next_index = CommitIndex + 1,
last_index = CommitIndex,
last_term = CITerm},
next_index = LastAppliedIdx + 1,
last_index = LastAppliedIdx,
last_term = LATerm},
State}.

append_entries_reply(Term, Success, State = #{log := Log}) ->
append_entries_reply(Term, Success, #{log := Log} = State) ->
% we can't use the the last received idx
% as it may not have been persisted yet
% also we can't use the last writted Idx as then
Expand Down Expand Up @@ -3156,14 +3204,14 @@ fold_log_from(From, Folder, {St, Log0}) ->
{ok, {St1, Log}}
end.

drop_existing({Log0, []}) ->
{Log0, []};
drop_existing({Log0, [{Idx, Trm, _} | Tail] = Entries}) ->
drop_existing(Log0, [], LastIdx) ->
{Log0, [], LastIdx};
drop_existing(Log0, [{Idx, Trm, _} | Tail] = Entries, LastIdx) ->
case ra_log:exists({Idx, Trm}, Log0) of
{true, Log} ->
drop_existing({Log, Tail});
drop_existing(Log, Tail, Idx);
{false, Log} ->
{Log, Entries}
{Log, Entries, LastIdx}
end.

cast_reply(From, To, Msg) ->
Expand Down
Loading