Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ISSUE#268]Fix wrong ledgerIndex and wrong generated snapshot in snapshot mode #269

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -269,11 +269,11 @@ public void doWork() {
if (DLedgerUtils.elapsed(lastPrintWatermarkTimeMs) > 3000) {
if (DLedgerEntryPusher.this.fsmCaller.isPresent()) {
final long lastAppliedIndex = DLedgerEntryPusher.this.fsmCaller.get().getLastAppliedIndex();
logger.info("[{}][{}] term={} ledgerBegin={} ledgerEnd={} committed={} watermarks={} appliedIndex={}",
memberState.getSelfId(), memberState.getRole(), memberState.currTerm(), dLedgerStore.getLedgerBeginIndex(), dLedgerStore.getLedgerEndIndex(), dLedgerStore.getCommittedIndex(), JSON.toJSONString(peerWaterMarksByTerm), lastAppliedIndex);
logger.info("[{}][{}] term={} ledgerBeforeBegin={} ledgerEnd={} committed={} watermarks={} appliedIndex={}",
memberState.getSelfId(), memberState.getRole(), memberState.currTerm(), dLedgerStore.getLedgerBeforeBeginIndex(), dLedgerStore.getLedgerEndIndex(), dLedgerStore.getCommittedIndex(), JSON.toJSONString(peerWaterMarksByTerm), lastAppliedIndex);
} else {
logger.info("[{}][{}] term={} ledgerBegin={} ledgerEnd={} committed={} watermarks={}",
memberState.getSelfId(), memberState.getRole(), memberState.currTerm(), dLedgerStore.getLedgerBeginIndex(), dLedgerStore.getLedgerEndIndex(), dLedgerStore.getCommittedIndex(), JSON.toJSONString(peerWaterMarksByTerm));
logger.info("[{}][{}] term={} ledgerBeforeBegin={} ledgerEnd={} committed={} watermarks={}",
memberState.getSelfId(), memberState.getRole(), memberState.currTerm(), dLedgerStore.getLedgerBeforeBeginIndex(), dLedgerStore.getLedgerEndIndex(), dLedgerStore.getCommittedIndex(), JSON.toJSONString(peerWaterMarksByTerm));
}
lastPrintWatermarkTimeMs = System.currentTimeMillis();
}
Expand Down Expand Up @@ -320,10 +320,11 @@ public void doWork() {
final Optional<StateMachineCaller> fsmCaller = DLedgerEntryPusher.this.fsmCaller;
if (fsmCaller.isPresent()) {
// If there exist statemachine
DLedgerEntryPusher.this.dLedgerStore.updateCommittedIndex(currTerm, quorumIndex);
final StateMachineCaller caller = fsmCaller.get();
caller.onCommitted(quorumIndex);

if (quorumIndex > this.lastQuorumIndex) {
DLedgerEntryPusher.this.dLedgerStore.updateCommittedIndex(currTerm, quorumIndex);
caller.onCommitted(quorumIndex);
}
// Check elapsed
if (DLedgerUtils.elapsed(lastCheckLeakTimeMs) > 1000) {
updatePeerWaterMark(currTerm, memberState.getSelfId(), dLedgerStore.getLedgerEndIndex());
Expand All @@ -335,7 +336,9 @@ public void doWork() {
waitForRunning(1);
}
} else {
dLedgerStore.updateCommittedIndex(currTerm, quorumIndex);
if (quorumIndex > this.lastQuorumIndex) {
dLedgerStore.updateCommittedIndex(currTerm, quorumIndex);
}
ConcurrentMap<Long, TimeoutFuture<AppendEntryResponse>> responses = pendingAppendResponsesByTerm.get(currTerm);
boolean needCheck = false;
int ackNum = 0;
Expand Down Expand Up @@ -733,8 +736,8 @@ private void doCompare() throws Exception {
if (compareIndex == -1) {
compareIndex = dLedgerStore.getLedgerEndIndex();
logger.info("[Push-{}][DoCompare] compareIndex=-1 means start to compare", peerId);
} else if (compareIndex > dLedgerStore.getLedgerEndIndex() || compareIndex < dLedgerStore.getLedgerBeginIndex()) {
logger.info("[Push-{}][DoCompare] compareIndex={} out of range {}-{}", peerId, compareIndex, dLedgerStore.getLedgerBeginIndex(), dLedgerStore.getLedgerEndIndex());
} else if (compareIndex > dLedgerStore.getLedgerEndIndex() || compareIndex <= dLedgerStore.getLedgerBeforeBeginIndex()) {
logger.info("[Push-{}][DoCompare] compareIndex={} out of range ({}-{}]", peerId, compareIndex, dLedgerStore.getLedgerBeforeBeginIndex(), dLedgerStore.getLedgerEndIndex());
compareIndex = dLedgerStore.getLedgerEndIndex();
}

Expand All @@ -761,21 +764,21 @@ private void doCompare() throws Exception {
} else {
truncateIndex = compareIndex;
}
} else if (response.getEndIndex() < dLedgerStore.getLedgerBeginIndex()
} else if (response.getEndIndex() <= dLedgerStore.getLedgerBeforeBeginIndex()
|| response.getBeginIndex() > dLedgerStore.getLedgerEndIndex()) {
/*
The follower's entries does not intersect with the leader.
This usually happened when the follower has crashed for a long time while the leader has deleted the expired entries.
Just truncate the follower.
*/
truncateIndex = dLedgerStore.getLedgerBeginIndex();
truncateIndex = dLedgerStore.getLedgerBeforeBeginIndex() + 1;
} else if (compareIndex < response.getBeginIndex()) {
/*
The compared index is smaller than the follower's begin index.
This happened rarely, usually means some disk damage.
Just truncate the follower.
*/
truncateIndex = dLedgerStore.getLedgerBeginIndex();
truncateIndex = dLedgerStore.getLedgerBeforeBeginIndex() + 1;
} else if (compareIndex > response.getEndIndex()) {
/*
The compared index is bigger than the follower's end index.
Expand All @@ -791,8 +794,8 @@ private void doCompare() throws Exception {
/*
The compared index is smaller than the leader's begin index, truncate the follower.
*/
if (compareIndex < dLedgerStore.getLedgerBeginIndex()) {
truncateIndex = dLedgerStore.getLedgerBeginIndex();
if (compareIndex <= dLedgerStore.getLedgerBeforeBeginIndex()) {
truncateIndex = dLedgerStore.getLedgerBeforeBeginIndex() + 1;
}
/*
If get value for truncateIndex, do it right now.
Expand Down Expand Up @@ -890,7 +893,7 @@ private PushEntryResponse buildResponse(PushEntryRequest request, int code) {
response.setIndex(request.getFirstEntryIndex());
response.setCount(request.getCount());
}
response.setBeginIndex(dLedgerStore.getLedgerBeginIndex());
response.setBeginIndex(dLedgerStore.getLedgerBeforeBeginIndex() + 1);
response.setEndIndex(dLedgerStore.getLedgerEndIndex());
return response;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,8 @@ public class SnapshotManager {
public static final String SNAPSHOT_TEMP_DIR = "tmp";

private DLedgerServer dLedgerServer;
private long lastSnapshotIndex;
private long lastSnapshotTerm;
private long lastSnapshotIndex = -1;
private long lastSnapshotTerm = -1;
private final SnapshotStore snapshotStore;
private volatile boolean savingSnapshot;
private volatile boolean loadingSnapshot;
Expand Down Expand Up @@ -126,7 +126,7 @@ public void saveSnapshot(DLedgerEntry dLedgerEntry) {
return;
}
// Check if applied index reaching the snapshot threshold
if (dLedgerEntry.getIndex() - this.lastSnapshotIndex <= this.dLedgerServer.getDLedgerConfig().getSnapshotThreshold()) {
if (dLedgerEntry.getIndex() - this.lastSnapshotIndex < this.dLedgerServer.getDLedgerConfig().getSnapshotThreshold()) {
return;
}
// Create snapshot writer
Expand Down Expand Up @@ -172,7 +172,7 @@ private void saveSnapshotAfter(SnapshotWriter writer, SnapshotMeta snapshotMeta,

private void truncatePrefix(DLedgerEntry entry) {
deleteExpiredSnapshot();
this.dLedgerServer.getFsmCaller().getdLedgerStore().resetOffsetAfterSnapshot(entry);
this.dLedgerServer.getDLedgerStore().resetOffsetAfterSnapshot(entry);
}

private void deleteExpiredSnapshot() {
Expand Down Expand Up @@ -221,6 +221,7 @@ private void loadSnapshotAfter(SnapshotReader reader, SnapshotMeta snapshotMeta,
this.lastSnapshotIndex = snapshotMeta.getLastIncludedIndex();
this.lastSnapshotTerm = snapshotMeta.getLastIncludedTerm();
this.loadingSnapshot = false;
this.dLedgerServer.getDLedgerStore().updateIndexAfterLoadingSnapshot(this.lastSnapshotIndex, this.lastSnapshotTerm);
logger.info("Snapshot {} loaded successfully", snapshotMeta);
} else {
// Stop the loading process if the snapshot is expired
Expand All @@ -244,7 +245,7 @@ private void loadSnapshotAfter(SnapshotReader reader, SnapshotMeta snapshotMeta,
}
if (failed) {
// Still able to recover from files if the beginning index of file store is 0
if (this.dLedgerServer.getFsmCaller().getdLedgerStore().getLedgerBeginIndex() == 0) {
if (this.dLedgerServer.getFsmCaller().getdLedgerStore().getLedgerBeforeBeginIndex() == -1) {
this.loadingSnapshot = false;
return;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,6 @@ public void setSnapshotMeta(SnapshotMeta snapshotMeta) {
}

public long getSnapshotIndex() {
return this.snapshotMeta != null ? this.snapshotMeta.getLastIncludedIndex() : 0;
return this.snapshotMeta != null ? this.snapshotMeta.getLastIncludedIndex() : -1;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -170,11 +170,11 @@ private void doCommitted(final long committedIndex) {
if (this.error != null) {
return;
}
if (this.snapshotManager.isLoadingSnapshot()) {
if (this.snapshotManager.isLoadingSnapshot() || this.snapshotManager.isSavingSnapshot()) {
this.scheduledExecutorService.schedule(() -> {
try {
onCommitted(committedIndex);
logger.info("Still loading snapshot, retry the commit task later");
logger.info("Still loading or saving snapshot, retry the commit task later");
} catch (Throwable e) {
e.printStackTrace();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ public class DLedgerMemoryStore extends DLedgerStore {

private static final Logger LOGGER = LoggerFactory.getLogger(DLedgerMemoryStore.class);

private long ledgerBeforeBeginIndex = -1;
private long ledgerBeginIndex = -1;
private long ledgerEndIndex = -1;
private long committedIndex = -1;
Expand Down Expand Up @@ -61,9 +62,6 @@ public DLedgerEntry appendAsLeader(DLedgerEntry entry) {
LOGGER.debug("[{}] Append as Leader {} {}", memberState.getSelfId(), entry.getIndex(), entry.getBody().length);
}
cachedEntries.put(entry.getIndex(), entry);
if (ledgerBeginIndex == -1) {
ledgerBeginIndex = ledgerEndIndex;
}
updateLedgerEndIndexAndTerm();
return entry;
}
Expand All @@ -74,6 +72,28 @@ public long truncate(DLedgerEntry entry, long leaderTerm, String leaderId) {
return appendAsFollower(entry, leaderTerm, leaderId).getIndex();
}

@Override
public void resetOffsetAfterSnapshot(DLedgerEntry entry) {

}

@Override
public void updateIndexAfterLoadingSnapshot(long lastIncludedIndex, long lastIncludedTerm) {
this.ledgerBeforeBeginIndex = lastIncludedIndex;
this.ledgerEndIndex = lastIncludedIndex;
this.ledgerEndTerm = lastIncludedTerm;
}

@Override
public void startup() {

}

@Override
public void shutdown() {

}

@Override
public DLedgerEntry appendAsFollower(DLedgerEntry entry, long leaderTerm, String leaderId) {
PreConditions.check(memberState.isFollower(), DLedgerResponseCode.NOT_FOLLOWER);
Expand All @@ -88,9 +108,6 @@ public DLedgerEntry appendAsFollower(DLedgerEntry entry, long leaderTerm, String
ledgerEndIndex = entry.getIndex();
committedIndex = entry.getIndex();
cachedEntries.put(entry.getIndex(), entry);
if (ledgerBeginIndex == -1) {
ledgerBeginIndex = ledgerEndIndex;
}
updateLedgerEndIndexAndTerm();
return entry;
}
Expand All @@ -107,11 +124,17 @@ public long getLedgerEndIndex() {
return ledgerEndIndex;
}

@Deprecated
@Override
public long getLedgerBeginIndex() {
return ledgerBeginIndex;
}

@Override
public long getLedgerBeforeBeginIndex() {
return ledgerBeforeBeginIndex;
}

@Override
public long getCommittedIndex() {
return committedIndex;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@ public void updateCommittedIndex(long term, long committedIndex) {

public abstract long getLedgerBeginIndex();

public abstract long getLedgerBeforeBeginIndex();

protected void updateLedgerEndIndexAndTerm() {
if (getMemberState() != null) {
getMemberState().updateLedgerIndexAndTerm(getLedgerEndIndex(), getLedgerEndTerm());
Expand All @@ -57,15 +59,12 @@ public long truncate(DLedgerEntry entry, long leaderTerm, String leaderId) {
return -1;
}

public void resetOffsetAfterSnapshot(DLedgerEntry entry) {
public abstract void resetOffsetAfterSnapshot(DLedgerEntry entry);

}
public abstract void updateIndexAfterLoadingSnapshot(long lastIncludedIndex, long lastIncludedTerm);

public void startup() {
public abstract void startup();

}
public abstract void shutdown();

public void shutdown() {

}
}
Loading