From de58088082c4325222fd6f525c07a388432a0a5f Mon Sep 17 00:00:00 2001 From: TheR1sing3un Date: Wed, 12 Oct 2022 16:32:21 +0800 Subject: [PATCH 1/3] fix(main): fix wrong begin index after applying snapshot 1. fix wrong begin index after applying snapshot --- .../storage/dledger/DLedgerEntryPusher.java | 24 +++---- .../dledger/snapshot/SnapshotManager.java | 5 +- .../dledger/store/DLedgerMemoryStore.java | 13 ++-- .../storage/dledger/store/DLedgerStore.java | 2 + .../store/file/DLedgerMmapFileStore.java | 68 +++++++++++++++---- .../storage/dledger/AppendAndPushTest.java | 18 ++--- .../storage/dledger/BatchPushTest.java | 14 ++-- .../dledger/snapshot/SnapshotManagerTest.java | 33 ++++++--- .../store/DLedgerMappedFileStoreTest.java | 22 +++--- 9 files changed, 130 insertions(+), 69 deletions(-) diff --git a/dledger/src/main/java/io/openmessaging/storage/dledger/DLedgerEntryPusher.java b/dledger/src/main/java/io/openmessaging/storage/dledger/DLedgerEntryPusher.java index 3e8dfe36..7d677a35 100644 --- a/dledger/src/main/java/io/openmessaging/storage/dledger/DLedgerEntryPusher.java +++ b/dledger/src/main/java/io/openmessaging/storage/dledger/DLedgerEntryPusher.java @@ -269,11 +269,11 @@ public void doWork() { if (DLedgerUtils.elapsed(lastPrintWatermarkTimeMs) > 3000) { if (DLedgerEntryPusher.this.fsmCaller.isPresent()) { final long lastAppliedIndex = DLedgerEntryPusher.this.fsmCaller.get().getLastAppliedIndex(); - logger.info("[{}][{}] term={} ledgerBegin={} ledgerEnd={} committed={} watermarks={} appliedIndex={}", - memberState.getSelfId(), memberState.getRole(), memberState.currTerm(), dLedgerStore.getLedgerBeginIndex(), dLedgerStore.getLedgerEndIndex(), dLedgerStore.getCommittedIndex(), JSON.toJSONString(peerWaterMarksByTerm), lastAppliedIndex); + logger.info("[{}][{}] term={} ledgerBeforeBegin={} ledgerEnd={} committed={} watermarks={} appliedIndex={}", + memberState.getSelfId(), memberState.getRole(), memberState.currTerm(), dLedgerStore.getLedgerBeforeBeginIndex(), dLedgerStore.getLedgerEndIndex(), dLedgerStore.getCommittedIndex(), JSON.toJSONString(peerWaterMarksByTerm), lastAppliedIndex); } else { - logger.info("[{}][{}] term={} ledgerBegin={} ledgerEnd={} committed={} watermarks={}", - memberState.getSelfId(), memberState.getRole(), memberState.currTerm(), dLedgerStore.getLedgerBeginIndex(), dLedgerStore.getLedgerEndIndex(), dLedgerStore.getCommittedIndex(), JSON.toJSONString(peerWaterMarksByTerm)); + logger.info("[{}][{}] term={} ledgerBeforeBegin={} ledgerEnd={} committed={} watermarks={}", + memberState.getSelfId(), memberState.getRole(), memberState.currTerm(), dLedgerStore.getLedgerBeforeBeginIndex(), dLedgerStore.getLedgerEndIndex(), dLedgerStore.getCommittedIndex(), JSON.toJSONString(peerWaterMarksByTerm)); } lastPrintWatermarkTimeMs = System.currentTimeMillis(); } @@ -733,8 +733,8 @@ private void doCompare() throws Exception { if (compareIndex == -1) { compareIndex = dLedgerStore.getLedgerEndIndex(); logger.info("[Push-{}][DoCompare] compareIndex=-1 means start to compare", peerId); - } else if (compareIndex > dLedgerStore.getLedgerEndIndex() || compareIndex < dLedgerStore.getLedgerBeginIndex()) { - logger.info("[Push-{}][DoCompare] compareIndex={} out of range {}-{}", peerId, compareIndex, dLedgerStore.getLedgerBeginIndex(), dLedgerStore.getLedgerEndIndex()); + } else if (compareIndex > dLedgerStore.getLedgerEndIndex() || compareIndex <= dLedgerStore.getLedgerBeforeBeginIndex()) { + logger.info("[Push-{}][DoCompare] compareIndex={} out of range ({}-{}]", peerId, compareIndex, dLedgerStore.getLedgerBeforeBeginIndex(), dLedgerStore.getLedgerEndIndex()); compareIndex = dLedgerStore.getLedgerEndIndex(); } @@ -761,21 +761,21 @@ private void doCompare() throws Exception { } else { truncateIndex = compareIndex; } - } else if (response.getEndIndex() < dLedgerStore.getLedgerBeginIndex() + } else if (response.getEndIndex() <= dLedgerStore.getLedgerBeforeBeginIndex() || response.getBeginIndex() > dLedgerStore.getLedgerEndIndex()) { /* The follower's entries does not intersect with the leader. This usually happened when the follower has crashed for a long time while the leader has deleted the expired entries. Just truncate the follower. */ - truncateIndex = dLedgerStore.getLedgerBeginIndex(); + truncateIndex = dLedgerStore.getLedgerBeforeBeginIndex() + 1; } else if (compareIndex < response.getBeginIndex()) { /* The compared index is smaller than the follower's begin index. This happened rarely, usually means some disk damage. Just truncate the follower. */ - truncateIndex = dLedgerStore.getLedgerBeginIndex(); + truncateIndex = dLedgerStore.getLedgerBeforeBeginIndex() + 1; } else if (compareIndex > response.getEndIndex()) { /* The compared index is bigger than the follower's end index. @@ -791,8 +791,8 @@ private void doCompare() throws Exception { /* The compared index is smaller than the leader's begin index, truncate the follower. */ - if (compareIndex < dLedgerStore.getLedgerBeginIndex()) { - truncateIndex = dLedgerStore.getLedgerBeginIndex(); + if (compareIndex <= dLedgerStore.getLedgerBeforeBeginIndex()) { + truncateIndex = dLedgerStore.getLedgerBeforeBeginIndex() + 1; } /* If get value for truncateIndex, do it right now. @@ -890,7 +890,7 @@ private PushEntryResponse buildResponse(PushEntryRequest request, int code) { response.setIndex(request.getFirstEntryIndex()); response.setCount(request.getCount()); } - response.setBeginIndex(dLedgerStore.getLedgerBeginIndex()); + response.setBeginIndex(dLedgerStore.getLedgerBeforeBeginIndex() + 1); response.setEndIndex(dLedgerStore.getLedgerEndIndex()); return response; } diff --git a/dledger/src/main/java/io/openmessaging/storage/dledger/snapshot/SnapshotManager.java b/dledger/src/main/java/io/openmessaging/storage/dledger/snapshot/SnapshotManager.java index a537287f..ef8fb25e 100644 --- a/dledger/src/main/java/io/openmessaging/storage/dledger/snapshot/SnapshotManager.java +++ b/dledger/src/main/java/io/openmessaging/storage/dledger/snapshot/SnapshotManager.java @@ -164,6 +164,7 @@ private void saveSnapshotAfter(SnapshotWriter writer, SnapshotMeta snapshotMeta, CompletableFuture.runAsync(() -> { truncatePrefix(dLedgerEntry); }); + //truncatePrefix(dLedgerEntry); } else { logger.error("Unable to save snapshot"); } @@ -172,7 +173,7 @@ private void saveSnapshotAfter(SnapshotWriter writer, SnapshotMeta snapshotMeta, private void truncatePrefix(DLedgerEntry entry) { deleteExpiredSnapshot(); - this.dLedgerServer.getFsmCaller().getdLedgerStore().resetOffsetAfterSnapshot(entry); + this.dLedgerServer.getDLedgerStore().resetOffsetAfterSnapshot(entry); } private void deleteExpiredSnapshot() { @@ -244,7 +245,7 @@ private void loadSnapshotAfter(SnapshotReader reader, SnapshotMeta snapshotMeta, } if (failed) { // Still able to recover from files if the beginning index of file store is 0 - if (this.dLedgerServer.getFsmCaller().getdLedgerStore().getLedgerBeginIndex() == 0) { + if (this.dLedgerServer.getFsmCaller().getdLedgerStore().getLedgerBeforeBeginIndex() == -1) { this.loadingSnapshot = false; return; } diff --git a/dledger/src/main/java/io/openmessaging/storage/dledger/store/DLedgerMemoryStore.java b/dledger/src/main/java/io/openmessaging/storage/dledger/store/DLedgerMemoryStore.java index 542c3c2c..0fe63556 100644 --- a/dledger/src/main/java/io/openmessaging/storage/dledger/store/DLedgerMemoryStore.java +++ b/dledger/src/main/java/io/openmessaging/storage/dledger/store/DLedgerMemoryStore.java @@ -32,6 +32,7 @@ public class DLedgerMemoryStore extends DLedgerStore { private static final Logger LOGGER = LoggerFactory.getLogger(DLedgerMemoryStore.class); + private long ledgerBeforeBeginIndex = -1; private long ledgerBeginIndex = -1; private long ledgerEndIndex = -1; private long committedIndex = -1; @@ -61,9 +62,6 @@ public DLedgerEntry appendAsLeader(DLedgerEntry entry) { LOGGER.debug("[{}] Append as Leader {} {}", memberState.getSelfId(), entry.getIndex(), entry.getBody().length); } cachedEntries.put(entry.getIndex(), entry); - if (ledgerBeginIndex == -1) { - ledgerBeginIndex = ledgerEndIndex; - } updateLedgerEndIndexAndTerm(); return entry; } @@ -88,9 +86,6 @@ public DLedgerEntry appendAsFollower(DLedgerEntry entry, long leaderTerm, String ledgerEndIndex = entry.getIndex(); committedIndex = entry.getIndex(); cachedEntries.put(entry.getIndex(), entry); - if (ledgerBeginIndex == -1) { - ledgerBeginIndex = ledgerEndIndex; - } updateLedgerEndIndexAndTerm(); return entry; } @@ -107,11 +102,17 @@ public long getLedgerEndIndex() { return ledgerEndIndex; } + @Deprecated @Override public long getLedgerBeginIndex() { return ledgerBeginIndex; } + @Override + public long getLedgerBeforeBeginIndex() { + return ledgerBeforeBeginIndex; + } + @Override public long getCommittedIndex() { return committedIndex; diff --git a/dledger/src/main/java/io/openmessaging/storage/dledger/store/DLedgerStore.java b/dledger/src/main/java/io/openmessaging/storage/dledger/store/DLedgerStore.java index 6f1f981b..013dc087 100644 --- a/dledger/src/main/java/io/openmessaging/storage/dledger/store/DLedgerStore.java +++ b/dledger/src/main/java/io/openmessaging/storage/dledger/store/DLedgerStore.java @@ -43,6 +43,8 @@ public void updateCommittedIndex(long term, long committedIndex) { public abstract long getLedgerBeginIndex(); + public abstract long getLedgerBeforeBeginIndex(); + protected void updateLedgerEndIndexAndTerm() { if (getMemberState() != null) { getMemberState().updateLedgerIndexAndTerm(getLedgerEndIndex(), getLedgerEndTerm()); diff --git a/dledger/src/main/java/io/openmessaging/storage/dledger/store/file/DLedgerMmapFileStore.java b/dledger/src/main/java/io/openmessaging/storage/dledger/store/file/DLedgerMmapFileStore.java index 08fb677e..a8c5375d 100644 --- a/dledger/src/main/java/io/openmessaging/storage/dledger/store/file/DLedgerMmapFileStore.java +++ b/dledger/src/main/java/io/openmessaging/storage/dledger/store/file/DLedgerMmapFileStore.java @@ -50,6 +50,8 @@ public class DLedgerMmapFileStore extends DLedgerStore { private static final Logger LOGGER = LoggerFactory.getLogger(DLedgerMmapFileStore.class); public List appendHooks = new ArrayList<>(); + + private long ledgerBeforeBeginIndex = -1; private long ledgerBeginIndex = -1; private long ledgerEndIndex = -1; private long committedIndex = -1; @@ -301,7 +303,7 @@ public void recover() { DLedgerEntry entry = get(lastEntryIndex); PreConditions.check(entry != null, DLedgerResponseCode.DISK_ERROR, "recheck get null entry"); PreConditions.check(entry.getIndex() == lastEntryIndex, DLedgerResponseCode.DISK_ERROR, "recheck index %d != %d", entry.getIndex(), lastEntryIndex); - reviseLedgerBeginIndex(); + reviseLedgerBeforeBeginIndex(); } this.dataFileList.updateWherePosition(processOffset); this.dataFileList.truncateOffset(processOffset); @@ -325,6 +327,7 @@ public void recover() { } + @Deprecated private void reviseLedgerBeginIndex() { //get ledger begin index MmapFile firstFile = dataFileList.getFirstMappedFile(); @@ -341,6 +344,30 @@ private void reviseLedgerBeginIndex() { } } + private void reviseLedgerBeforeBeginIndex() { + // get ledger begin index + System.out.println(this.memberState.getSelfId() + " start to revise before index, now before index = " + this.ledgerBeforeBeginIndex); + MmapFile firstFile = dataFileList.getFirstMappedFile(); + SelectMmapBufferResult sbr = firstFile.selectMappedBuffer(0); + try { + ByteBuffer tmpBuffer = sbr.getByteBuffer(); + tmpBuffer.position(firstFile.getStartPosition()); + tmpBuffer.getInt(); //magic + int size = tmpBuffer.getInt();//size + if (size == 0) { + // means that now empty entry + return; + } + // begin index + long beginIndex = tmpBuffer.getLong(); + System.out.println(this.memberState.getSelfId() + " update before index from " + this.ledgerBeginIndex + " to " + (beginIndex - 1)); + this.ledgerBeforeBeginIndex = beginIndex - 1; + indexFileList.resetOffset(beginIndex * INDEX_UNIT_SIZE); + } finally { + SelectMmapBufferResult.release(sbr); + } + } + @Override public DLedgerEntry appendAsLeader(DLedgerEntry entry) { PreConditions.check(memberState.isLeader(), DLedgerResponseCode.NOT_LEADER); @@ -375,9 +402,9 @@ public DLedgerEntry appendAsLeader(DLedgerEntry entry) { } ledgerEndIndex++; ledgerEndTerm = memberState.currTerm(); - if (ledgerBeginIndex == -1) { - ledgerBeginIndex = ledgerEndIndex; - } +// if (ledgerBeginIndex == -1) { +// ledgerBeginIndex = ledgerEndIndex; +// } updateLedgerEndIndexAndTerm(); return entry; } @@ -428,7 +455,7 @@ public long truncate(DLedgerEntry entry, long leaderTerm, String leaderId) { PreConditions.check(indexPos == entry.getIndex() * INDEX_UNIT_SIZE, DLedgerResponseCode.DISK_ERROR, null); ledgerEndTerm = entry.getTerm(); ledgerEndIndex = entry.getIndex(); - reviseLedgerBeginIndex(); + reviseLedgerBeforeBeginIndex(); updateLedgerEndIndexAndTerm(); return entry.getIndex(); } @@ -470,11 +497,18 @@ private long calculateWherePosition(final MmapFileList mappedFileList, long cont @Override public void resetOffsetAfterSnapshot(DLedgerEntry entry) { + // judge expired + if (entry.getIndex() <= this.ledgerBeforeBeginIndex) { + return; + } + System.out.println(this.memberState.getSelfId() + " reset offset after snapshot, now before index = " + this.ledgerBeforeBeginIndex + ", snapshot last included index = " + entry.getIndex()); long resetPos = entry.getPos() + entry.getSize(); dataFileList.resetOffset(resetPos); long resetIndexOffset = entry.getIndex() * INDEX_UNIT_SIZE; indexFileList.resetOffset(resetIndexOffset); - reviseLedgerBeginIndex(); + // reset ledgerBeforeBeginIndex + System.out.println(this.memberState.getSelfId() + " update before index from " + this.ledgerBeginIndex + " to " + entry.getIndex()); + this.ledgerBeforeBeginIndex = entry.getIndex(); } @Override @@ -498,9 +532,9 @@ public DLedgerEntry appendAsFollower(DLedgerEntry entry, long leaderTerm, String PreConditions.check(indexPos == entry.getIndex() * INDEX_UNIT_SIZE, DLedgerResponseCode.DISK_ERROR, null); ledgerEndTerm = entry.getTerm(); ledgerEndIndex = entry.getIndex(); - if (ledgerBeginIndex == -1) { - ledgerBeginIndex = ledgerEndIndex; - } +// if (ledgerBeginIndex == -1) { +// ledgerBeginIndex = ledgerEndIndex; +// } updateLedgerEndIndexAndTerm(); return entry; } @@ -535,11 +569,17 @@ public long getLedgerEndIndex() { return ledgerEndIndex; } + @Deprecated @Override public long getLedgerBeginIndex() { return ledgerBeginIndex; } + @Override + public long getLedgerBeforeBeginIndex() { + return ledgerBeforeBeginIndex; + } + @Override public DLedgerEntry get(Long index) { indexCheck(index); @@ -582,8 +622,8 @@ public Pair getEntryPosAndSize(Long index) { public void indexCheck(Long index) { PreConditions.check(index >= 0, DLedgerResponseCode.INDEX_OUT_OF_RANGE, "%d should gt 0", index); - PreConditions.check(index >= ledgerBeginIndex, DLedgerResponseCode.INDEX_LESS_THAN_LOCAL_BEGIN, "%d should be gt %d, ledgerBeginIndex may be revised", index, ledgerBeginIndex); - PreConditions.check(index <= ledgerEndIndex, DLedgerResponseCode.INDEX_OUT_OF_RANGE, "%d should between %d-%d", index, ledgerBeginIndex, ledgerEndIndex); + PreConditions.check(index > ledgerBeforeBeginIndex, DLedgerResponseCode.INDEX_LESS_THAN_LOCAL_BEGIN, "%d should be gt %d, beforeBeginIndex may be revised", index, ledgerBeforeBeginIndex); + PreConditions.check(index <= ledgerEndIndex, DLedgerResponseCode.INDEX_OUT_OF_RANGE, "%d should between (%d-%d]", index, ledgerBeforeBeginIndex, ledgerEndIndex); } @Override @@ -600,8 +640,8 @@ public void updateCommittedIndex(long term, long newCommittedIndex) { return; } if (newCommittedIndex < this.committedIndex - || newCommittedIndex < this.ledgerBeginIndex) { - LOGGER.warn("[MONITOR]Skip update committed index for new={} < old={} or new={} < beginIndex={}", newCommittedIndex, this.committedIndex, newCommittedIndex, this.ledgerBeginIndex); + || newCommittedIndex <= this.ledgerBeforeBeginIndex) { + LOGGER.warn("[MONITOR]Skip update committed index for new={} < old={} or new={} <= beforeBeginIndex={}", newCommittedIndex, this.committedIndex, newCommittedIndex, this.ledgerBeforeBeginIndex); return; } long endIndex = ledgerEndIndex; @@ -727,7 +767,7 @@ public CleanSpaceService(String name, Logger logger) { count, timeUp, checkExpired, forceClean, enableForceClean, isDiskFull, storeBaseRatio, dataRatio); } if (count > 0) { - DLedgerMmapFileStore.this.reviseLedgerBeginIndex(); + DLedgerMmapFileStore.this.reviseLedgerBeforeBeginIndex(); } } getDataFileList().retryDeleteFirstFile(intervalForcibly); diff --git a/dledger/src/test/java/io/openmessaging/storage/dledger/AppendAndPushTest.java b/dledger/src/test/java/io/openmessaging/storage/dledger/AppendAndPushTest.java index 09c4e693..09c4be25 100644 --- a/dledger/src/test/java/io/openmessaging/storage/dledger/AppendAndPushTest.java +++ b/dledger/src/test/java/io/openmessaging/storage/dledger/AppendAndPushTest.java @@ -144,9 +144,9 @@ public void testPushNetworkNotStable() throws Exception { Thread.sleep(1500); Assertions.assertTrue(sendSuccess.get()); - Assertions.assertEquals(0, dLedgerServer0.getDLedgerStore().getLedgerBeginIndex()); + Assertions.assertEquals(-1, dLedgerServer0.getDLedgerStore().getLedgerBeforeBeginIndex()); Assertions.assertEquals(1, dLedgerServer0.getDLedgerStore().getLedgerEndIndex()); - Assertions.assertEquals(0, dLedgerServer1.getDLedgerStore().getLedgerBeginIndex()); + Assertions.assertEquals(-1, dLedgerServer1.getDLedgerStore().getLedgerBeforeBeginIndex()); Assertions.assertEquals(1, dLedgerServer1.getDLedgerStore().getLedgerEndIndex()); } @@ -176,10 +176,10 @@ public void testPushMissed() throws Exception { Assertions.assertEquals(appendEntryResponse.getCode(), DLedgerResponseCode.SUCCESS.getCode()); Assertions.assertEquals(i, appendEntryResponse.getIndex()); } - Assertions.assertEquals(0, dLedgerServer0.getDLedgerStore().getLedgerBeginIndex()); + Assertions.assertEquals(-1, dLedgerServer0.getDLedgerStore().getLedgerBeforeBeginIndex()); Assertions.assertEquals(9, dLedgerServer0.getDLedgerStore().getLedgerEndIndex()); - Assertions.assertEquals(0, dLedgerServer1.getDLedgerStore().getLedgerBeginIndex()); + Assertions.assertEquals(-1, dLedgerServer1.getDLedgerStore().getLedgerBeforeBeginIndex()); Assertions.assertEquals(9, dLedgerServer1.getDLedgerStore().getLedgerEndIndex()); } @@ -194,7 +194,7 @@ public void testPushTruncate() throws Exception { DLedgerEntry resEntry = dLedgerServer0.getDLedgerStore().appendAsLeader(entry); Assertions.assertEquals(i, resEntry.getIndex()); } - Assertions.assertEquals(0, dLedgerServer0.getDLedgerStore().getLedgerBeginIndex()); + Assertions.assertEquals(-1, dLedgerServer0.getDLedgerStore().getLedgerBeforeBeginIndex()); Assertions.assertEquals(9, dLedgerServer0.getDLedgerStore().getLedgerEndIndex()); List entries = new ArrayList<>(); for (long i = 0; i < 10; i++) { @@ -213,9 +213,9 @@ public void testPushTruncate() throws Exception { dLedgerServer1 = launchServer(group, peers, "n1", "n1", DLedgerConfig.FILE); dLedgerServer0 = launchServer(group, peers, "n0", "n1", DLedgerConfig.FILE); Thread.sleep(1000); - Assertions.assertEquals(0, dLedgerServer0.getDLedgerStore().getLedgerBeginIndex()); + Assertions.assertEquals(-1, dLedgerServer0.getDLedgerStore().getLedgerBeforeBeginIndex()); Assertions.assertEquals(4, dLedgerServer0.getDLedgerStore().getLedgerEndIndex()); - Assertions.assertEquals(0, dLedgerServer1.getDLedgerStore().getLedgerBeginIndex()); + Assertions.assertEquals(-1, dLedgerServer1.getDLedgerStore().getLedgerBeforeBeginIndex()); Assertions.assertEquals(4, dLedgerServer1.getDLedgerStore().getLedgerEndIndex()); for (int i = 0; i < 10; i++) { AppendEntryRequest request = new AppendEntryRequest(); @@ -261,10 +261,10 @@ public void testBatchAppend() throws Exception { Assertions.assertEquals(appendEntryResponse.getCode(), DLedgerResponseCode.SUCCESS.getCode()); Assertions.assertEquals(count - 1, appendEntryResponse.getIndex()); - Assertions.assertEquals(0, dLedgerServer0.getDLedgerStore().getLedgerBeginIndex()); + Assertions.assertEquals(-1, dLedgerServer0.getDLedgerStore().getLedgerBeforeBeginIndex()); Assertions.assertEquals(count - 1, dLedgerServer0.getDLedgerStore().getLedgerEndIndex()); - Assertions.assertEquals(0, dLedgerServer1.getDLedgerStore().getLedgerBeginIndex()); + Assertions.assertEquals(-1, dLedgerServer1.getDLedgerStore().getLedgerBeforeBeginIndex()); Assertions.assertEquals(count - 1, dLedgerServer1.getDLedgerStore().getLedgerEndIndex()); Thread.sleep(1000); } diff --git a/dledger/src/test/java/io/openmessaging/storage/dledger/BatchPushTest.java b/dledger/src/test/java/io/openmessaging/storage/dledger/BatchPushTest.java index 9434d41d..040ccea9 100644 --- a/dledger/src/test/java/io/openmessaging/storage/dledger/BatchPushTest.java +++ b/dledger/src/test/java/io/openmessaging/storage/dledger/BatchPushTest.java @@ -178,9 +178,9 @@ public void testBatchPushNetworkNotStable() throws Exception { Thread.sleep(1500); Assertions.assertTrue(sendSuccess.get()); - Assertions.assertEquals(0, dLedgerServer0.getDLedgerStore().getLedgerBeginIndex()); + Assertions.assertEquals(-1, dLedgerServer0.getDLedgerStore().getLedgerBeforeBeginIndex()); Assertions.assertEquals(1, dLedgerServer0.getDLedgerStore().getLedgerEndIndex()); - Assertions.assertEquals(0, dLedgerServer1.getDLedgerStore().getLedgerBeginIndex()); + Assertions.assertEquals(-1, dLedgerServer1.getDLedgerStore().getLedgerBeforeBeginIndex()); Assertions.assertEquals(1, dLedgerServer1.getDLedgerStore().getLedgerEndIndex()); } @@ -210,10 +210,10 @@ public void testBatchPushMissed() throws Exception { Assertions.assertEquals(appendEntryResponse.getCode(), DLedgerResponseCode.SUCCESS.getCode()); Assertions.assertEquals(i, appendEntryResponse.getIndex()); } - Assertions.assertEquals(0, dLedgerServer0.getDLedgerStore().getLedgerBeginIndex()); + Assertions.assertEquals(-1, dLedgerServer0.getDLedgerStore().getLedgerBeforeBeginIndex()); Assertions.assertEquals(9, dLedgerServer0.getDLedgerStore().getLedgerEndIndex()); - Assertions.assertEquals(0, dLedgerServer1.getDLedgerStore().getLedgerBeginIndex()); + Assertions.assertEquals(-1, dLedgerServer1.getDLedgerStore().getLedgerBeforeBeginIndex()); Assertions.assertEquals(9, dLedgerServer1.getDLedgerStore().getLedgerEndIndex()); } @@ -228,7 +228,7 @@ public void testBatchPushTruncate() throws Exception { DLedgerEntry resEntry = dLedgerServer0.getDLedgerStore().appendAsLeader(entry); Assertions.assertEquals(i, resEntry.getIndex()); } - Assertions.assertEquals(0, dLedgerServer0.getDLedgerStore().getLedgerBeginIndex()); + Assertions.assertEquals(-1, dLedgerServer0.getDLedgerStore().getLedgerBeforeBeginIndex()); Assertions.assertEquals(9, dLedgerServer0.getDLedgerStore().getLedgerEndIndex()); List entries = new ArrayList<>(); for (long i = 0; i < 10; i++) { @@ -247,9 +247,9 @@ public void testBatchPushTruncate() throws Exception { dLedgerServer1 = launchServerEnableBatchPush(group, peers, "n1", "n1", DLedgerConfig.FILE); dLedgerServer0 = launchServerEnableBatchPush(group, peers, "n0", "n1", DLedgerConfig.FILE); Thread.sleep(1000); - Assertions.assertEquals(0, dLedgerServer0.getDLedgerStore().getLedgerBeginIndex()); + Assertions.assertEquals(-1, dLedgerServer0.getDLedgerStore().getLedgerBeforeBeginIndex()); Assertions.assertEquals(4, dLedgerServer0.getDLedgerStore().getLedgerEndIndex()); - Assertions.assertEquals(0, dLedgerServer1.getDLedgerStore().getLedgerBeginIndex()); + Assertions.assertEquals(-1, dLedgerServer1.getDLedgerStore().getLedgerBeforeBeginIndex()); Assertions.assertEquals(4, dLedgerServer1.getDLedgerStore().getLedgerEndIndex()); for (int i = 0; i < 10; i++) { AppendEntryRequest request = new AppendEntryRequest(); diff --git a/dledger/src/test/java/io/openmessaging/storage/dledger/snapshot/SnapshotManagerTest.java b/dledger/src/test/java/io/openmessaging/storage/dledger/snapshot/SnapshotManagerTest.java index a71334b7..0cb27602 100644 --- a/dledger/src/test/java/io/openmessaging/storage/dledger/snapshot/SnapshotManagerTest.java +++ b/dledger/src/test/java/io/openmessaging/storage/dledger/snapshot/SnapshotManagerTest.java @@ -46,16 +46,30 @@ public void testSaveAndLoadSnapshot() throws InterruptedException { assertEquals(DLedgerResponseCode.SUCCESS.getCode(), appendEntryResponse.getCode()); assertEquals(i, appendEntryResponse.getIndex()); } - Thread.sleep(1200); - for (DLedgerServer server : serverList) { - assertEquals(99, server.getdLedgerStore().getLedgerEndIndex()); - } - // Check state machine + Thread.sleep(5000); for (DLedgerServer server : serverList) { + assertEquals(99, server.getDLedgerStore().getLedgerEndIndex()); + assertEquals(99, server.getDLedgerStore().getLedgerBeforeBeginIndex()); + // check statemachine final MockStateMachine fsm = (MockStateMachine) server.getStateMachine(); assertEquals(99, fsm.getAppliedIndex()); assertEquals(100, fsm.getTotalEntries()); } + + AppendEntryResponse appendEntryResponse = dLedgerClient.append(new byte[512]); + assertEquals(DLedgerResponseCode.SUCCESS.getCode(), appendEntryResponse.getCode()); + assertEquals(100, appendEntryResponse.getIndex()); + + Thread.sleep(5000); + for (DLedgerServer server : serverList) { + assertEquals(100, server.getDLedgerStore().getLedgerEndIndex()); + assertEquals(99, server.getDLedgerStore().getLedgerBeforeBeginIndex()); + // check statemachine + final MockStateMachine fsm = (MockStateMachine) server.getStateMachine(); + assertEquals(100, fsm.getAppliedIndex()); + assertEquals(101, fsm.getTotalEntries()); + } + Thread.sleep(100); // Shutdown server dLedgerServer0.shutdown(); @@ -69,12 +83,15 @@ public void testSaveAndLoadSnapshot() throws InterruptedException { serverList.add(newDLedgerServer0); serverList.add(newDLedgerServer1); serverList.add(newDLedgerServer2); - Thread.sleep(1000); + Thread.sleep(5000); // State machine could only be recovered from snapshot due to the entry has been removed after saving snapshot for (DLedgerServer server : serverList) { + assertEquals(100, server.getDLedgerStore().getLedgerEndIndex()); + assertEquals(99, server.getDLedgerStore().getLedgerBeforeBeginIndex()); + // check statemachine final MockStateMachine fsm = (MockStateMachine) server.getStateMachine(); - assertEquals(99, server.getFsmCaller().getLastAppliedIndex()); - assertEquals(100, fsm.getTotalEntries()); + assertEquals(100, fsm.getAppliedIndex()); + assertEquals(101, fsm.getTotalEntries()); } } diff --git a/dledger/src/test/java/io/openmessaging/storage/dledger/store/DLedgerMappedFileStoreTest.java b/dledger/src/test/java/io/openmessaging/storage/dledger/store/DLedgerMappedFileStoreTest.java index c07f77fb..e59d78ab 100644 --- a/dledger/src/test/java/io/openmessaging/storage/dledger/store/DLedgerMappedFileStoreTest.java +++ b/dledger/src/test/java/io/openmessaging/storage/dledger/store/DLedgerMappedFileStoreTest.java @@ -107,7 +107,7 @@ public void testCommittedIndex() throws Exception { } fileStore.shutdown(); fileStore = createFileStore(group, peers, "n0", "n0"); - Assertions.assertEquals(0, fileStore.getLedgerBeginIndex()); + Assertions.assertEquals(-1, fileStore.getLedgerBeforeBeginIndex()); Assertions.assertEquals(99, fileStore.getLedgerEndIndex()); Assertions.assertEquals(90, fileStore.getCommittedIndex()); } @@ -184,7 +184,7 @@ public void testNormalRecovery() { } fileStore.shutdown(); fileStore = createFileStore(group, peers, "n0", "n0"); - Assertions.assertEquals(0, fileStore.getLedgerBeginIndex()); + Assertions.assertEquals(-1, fileStore.getLedgerBeforeBeginIndex()); Assertions.assertEquals(9, fileStore.getLedgerEndIndex()); for (long i = 0; i < 10; i++) { DLedgerEntry entry = fileStore.get(i); @@ -207,7 +207,7 @@ public void testAbnormalRecovery() { } Assertions.assertEquals(12, fileStore.getDataFileList().getMappedFiles().size()); Assertions.assertEquals(99, fileStore.getLedgerEndIndex()); - Assertions.assertEquals(0, fileStore.getLedgerBeginIndex()); + Assertions.assertEquals(-1, fileStore.getLedgerBeforeBeginIndex()); while (fileStore.getFlushPos() != fileStore.getWritePos()) { fileStore.flush(); } @@ -216,7 +216,7 @@ public void testAbnormalRecovery() { { DLedgerMmapFileStore fileStore = createFileStore(group, peers, "n0", "n0", 10 * 1024 + MIN_BLANK_LEN, 10 * DLedgerMmapFileStore.INDEX_UNIT_SIZE, 2); Assertions.assertEquals(10, fileStore.getDataFileList().getMappedFiles().size()); - Assertions.assertEquals(0, fileStore.getLedgerBeginIndex()); + Assertions.assertEquals(-1, fileStore.getLedgerBeforeBeginIndex()); Assertions.assertEquals(89, fileStore.getLedgerEndIndex()); for (long i = 0; i < 89; i++) { DLedgerEntry entry = fileStore.get(i); @@ -227,7 +227,7 @@ public void testAbnormalRecovery() { { DLedgerMmapFileStore fileStore = createFileStore(group, peers, "n0", "n0", 10 * 1024 + MIN_BLANK_LEN, 10 * DLedgerMmapFileStore.INDEX_UNIT_SIZE, 10); Assertions.assertEquals(0, fileStore.getDataFileList().getMappedFiles().size()); - Assertions.assertEquals(-1, fileStore.getLedgerBeginIndex()); + Assertions.assertEquals(-1, fileStore.getLedgerBeforeBeginIndex()); Assertions.assertEquals(-1, fileStore.getLedgerEndIndex()); fileStore.shutdown(); } @@ -245,7 +245,7 @@ public void testTruncate() { Assertions.assertEquals(i, resEntry.getIndex()); } Assertions.assertEquals(2, fileStore.getDataFileList().getMappedFiles().size()); - Assertions.assertEquals(0, fileStore.getLedgerBeginIndex()); + Assertions.assertEquals(-1, fileStore.getLedgerBeforeBeginIndex()); Assertions.assertEquals(9, fileStore.getLedgerEndIndex()); fileStore.getMemberState().changeToFollower(fileStore.getLedgerEndTerm(), "n0"); @@ -257,7 +257,7 @@ public void testTruncate() { Assertions.assertNotNull(midEntry); long midIndex = fileStore.truncate(midEntry, fileStore.getLedgerEndTerm(), "n0"); Assertions.assertEquals(5, midIndex); - Assertions.assertEquals(0, fileStore.getLedgerBeginIndex()); + Assertions.assertEquals(-1, fileStore.getLedgerBeforeBeginIndex()); Assertions.assertEquals(5, fileStore.getLedgerEndIndex()); Assertions.assertEquals(midEntry.getPos() + midEntry.getSize(), fileStore.getDataFileList().getMaxWrotePosition()); Assertions.assertEquals((midIndex + 1) * DLedgerMmapFileStore.INDEX_UNIT_SIZE, fileStore.getIndexFileList().getMaxWrotePosition()); @@ -268,7 +268,7 @@ public void testTruncate() { Assertions.assertNotNull(afterEntry); long afterIndex = fileStore.truncate(afterEntry, fileStore.getLedgerEndTerm(), "n0"); Assertions.assertEquals(6, afterIndex); - Assertions.assertEquals(0, fileStore.getLedgerBeginIndex()); + Assertions.assertEquals(-1, fileStore.getLedgerBeforeBeginIndex()); Assertions.assertEquals(6, fileStore.getLedgerEndIndex()); Assertions.assertEquals(afterEntry.getPos() + afterEntry.getSize(), fileStore.getDataFileList().getMaxWrotePosition()); Assertions.assertEquals((afterIndex + 1) * DLedgerMmapFileStore.INDEX_UNIT_SIZE, fileStore.getIndexFileList().getMaxWrotePosition()); @@ -281,7 +281,7 @@ public void testTruncate() { long endIndex = fileStore.truncate(endEntry, fileStore.getLedgerEndTerm(), "n0"); Assertions.assertEquals(9, endIndex); Assertions.assertEquals(9, fileStore.getLedgerEndIndex()); - Assertions.assertEquals(9, fileStore.getLedgerBeginIndex()); + Assertions.assertEquals(8, fileStore.getLedgerBeforeBeginIndex()); Assertions.assertEquals(endEntry.getPos() + endEntry.getSize(), fileStore.getDataFileList().getMaxWrotePosition()); Assertions.assertEquals((endIndex + 1) * DLedgerMmapFileStore.INDEX_UNIT_SIZE, fileStore.getIndexFileList().getMaxWrotePosition()); } @@ -323,7 +323,7 @@ public void testReviseWherePosition() throws Exception { fileStore.getDataFileList().flush(0); Assertions.assertEquals(3, fileStore.getDataFileList().getMappedFiles().size()); - Assertions.assertEquals(0, fileStore.getLedgerBeginIndex()); + Assertions.assertEquals(-1, fileStore.getLedgerBeforeBeginIndex()); Assertions.assertEquals(19, fileStore.getLedgerEndIndex()); fileStore.getMemberState().changeToFollower(fileStore.getLedgerEndTerm(), "n0"); @@ -338,7 +338,7 @@ public void testReviseWherePosition() throws Exception { Assertions.assertNotNull(entry); long index = fileStore.truncate(entry, fileStore.getLedgerEndTerm(), "n0"); Assertions.assertEquals(15, index); - Assertions.assertEquals(14, fileStore.getLedgerBeginIndex()); + Assertions.assertEquals(13, fileStore.getLedgerBeforeBeginIndex()); Assertions.assertEquals(15, fileStore.getLedgerEndIndex()); Assertions.assertEquals(entry.getPos() + entry.getSize(), fileStore.getDataFileList().getMaxWrotePosition()); Assertions.assertEquals((index + 1) * DLedgerMmapFileStore.INDEX_UNIT_SIZE, fileStore.getIndexFileList().getMaxWrotePosition()); From 8c2b0264040c69ad75f89d61f2014ed6c1026998 Mon Sep 17 00:00:00 2001 From: TheR1sing3un Date: Tue, 3 Jan 2023 22:53:15 +0800 Subject: [PATCH 2/3] fix: fix some problems in snapshot mode 1. fix some problems in snapshot mode --- .../storage/dledger/DLedgerEntryPusher.java | 1 + .../dledger/snapshot/SnapshotManager.java | 8 ++--- .../snapshot/file/FileSnapshotWriter.java | 2 +- .../statemachine/StateMachineCaller.java | 4 +-- .../dledger/store/DLedgerMemoryStore.java | 22 +++++++++++++ .../storage/dledger/store/DLedgerStore.java | 11 +++---- .../store/file/DLedgerMmapFileStore.java | 28 ++++++++++------- .../dledger/snapshot/SnapshotManagerTest.java | 31 +++++++++---------- .../statemachine/MockStateMachine.java | 4 ++- 9 files changed, 68 insertions(+), 43 deletions(-) diff --git a/dledger/src/main/java/io/openmessaging/storage/dledger/DLedgerEntryPusher.java b/dledger/src/main/java/io/openmessaging/storage/dledger/DLedgerEntryPusher.java index f654302d..f27ff62d 100644 --- a/dledger/src/main/java/io/openmessaging/storage/dledger/DLedgerEntryPusher.java +++ b/dledger/src/main/java/io/openmessaging/storage/dledger/DLedgerEntryPusher.java @@ -318,6 +318,7 @@ public void doWork() { .collect(Collectors.toList()); long quorumIndex = sortedWaterMarks.get(sortedWaterMarks.size() / 2); final Optional fsmCaller = DLedgerEntryPusher.this.fsmCaller; + if (quorumIndex == this.lastQuorumIndex) return; if (fsmCaller.isPresent()) { // If there exist statemachine DLedgerEntryPusher.this.dLedgerStore.updateCommittedIndex(currTerm, quorumIndex); diff --git a/dledger/src/main/java/io/openmessaging/storage/dledger/snapshot/SnapshotManager.java b/dledger/src/main/java/io/openmessaging/storage/dledger/snapshot/SnapshotManager.java index ef8fb25e..8be5dd0d 100644 --- a/dledger/src/main/java/io/openmessaging/storage/dledger/snapshot/SnapshotManager.java +++ b/dledger/src/main/java/io/openmessaging/storage/dledger/snapshot/SnapshotManager.java @@ -43,8 +43,8 @@ public class SnapshotManager { public static final String SNAPSHOT_TEMP_DIR = "tmp"; private DLedgerServer dLedgerServer; - private long lastSnapshotIndex; - private long lastSnapshotTerm; + private long lastSnapshotIndex = -1; + private long lastSnapshotTerm = -1; private final SnapshotStore snapshotStore; private volatile boolean savingSnapshot; private volatile boolean loadingSnapshot; @@ -126,7 +126,7 @@ public void saveSnapshot(DLedgerEntry dLedgerEntry) { return; } // Check if applied index reaching the snapshot threshold - if (dLedgerEntry.getIndex() - this.lastSnapshotIndex <= this.dLedgerServer.getDLedgerConfig().getSnapshotThreshold()) { + if (dLedgerEntry.getIndex() - this.lastSnapshotIndex < this.dLedgerServer.getDLedgerConfig().getSnapshotThreshold()) { return; } // Create snapshot writer @@ -164,7 +164,6 @@ private void saveSnapshotAfter(SnapshotWriter writer, SnapshotMeta snapshotMeta, CompletableFuture.runAsync(() -> { truncatePrefix(dLedgerEntry); }); - //truncatePrefix(dLedgerEntry); } else { logger.error("Unable to save snapshot"); } @@ -222,6 +221,7 @@ private void loadSnapshotAfter(SnapshotReader reader, SnapshotMeta snapshotMeta, this.lastSnapshotIndex = snapshotMeta.getLastIncludedIndex(); this.lastSnapshotTerm = snapshotMeta.getLastIncludedTerm(); this.loadingSnapshot = false; + this.dLedgerServer.getDLedgerStore().updateIndexAfterLoadingSnapshot(this.lastSnapshotIndex, this.lastSnapshotTerm); logger.info("Snapshot {} loaded successfully", snapshotMeta); } else { // Stop the loading process if the snapshot is expired diff --git a/dledger/src/main/java/io/openmessaging/storage/dledger/snapshot/file/FileSnapshotWriter.java b/dledger/src/main/java/io/openmessaging/storage/dledger/snapshot/file/FileSnapshotWriter.java index 11ef354d..5009cdd3 100644 --- a/dledger/src/main/java/io/openmessaging/storage/dledger/snapshot/file/FileSnapshotWriter.java +++ b/dledger/src/main/java/io/openmessaging/storage/dledger/snapshot/file/FileSnapshotWriter.java @@ -99,6 +99,6 @@ public void setSnapshotMeta(SnapshotMeta snapshotMeta) { } public long getSnapshotIndex() { - return this.snapshotMeta != null ? this.snapshotMeta.getLastIncludedIndex() : 0; + return this.snapshotMeta != null ? this.snapshotMeta.getLastIncludedIndex() : -1; } } diff --git a/dledger/src/main/java/io/openmessaging/storage/dledger/statemachine/StateMachineCaller.java b/dledger/src/main/java/io/openmessaging/storage/dledger/statemachine/StateMachineCaller.java index 67779a9b..e25f33a1 100644 --- a/dledger/src/main/java/io/openmessaging/storage/dledger/statemachine/StateMachineCaller.java +++ b/dledger/src/main/java/io/openmessaging/storage/dledger/statemachine/StateMachineCaller.java @@ -170,11 +170,11 @@ private void doCommitted(final long committedIndex) { if (this.error != null) { return; } - if (this.snapshotManager.isLoadingSnapshot()) { + if (this.snapshotManager.isLoadingSnapshot() || this.snapshotManager.isSavingSnapshot()) { this.scheduledExecutorService.schedule(() -> { try { onCommitted(committedIndex); - logger.info("Still loading snapshot, retry the commit task later"); + logger.info("Still loading or saving snapshot, retry the commit task later"); } catch (Throwable e) { e.printStackTrace(); } diff --git a/dledger/src/main/java/io/openmessaging/storage/dledger/store/DLedgerMemoryStore.java b/dledger/src/main/java/io/openmessaging/storage/dledger/store/DLedgerMemoryStore.java index 0fe63556..398ed4a4 100644 --- a/dledger/src/main/java/io/openmessaging/storage/dledger/store/DLedgerMemoryStore.java +++ b/dledger/src/main/java/io/openmessaging/storage/dledger/store/DLedgerMemoryStore.java @@ -72,6 +72,28 @@ public long truncate(DLedgerEntry entry, long leaderTerm, String leaderId) { return appendAsFollower(entry, leaderTerm, leaderId).getIndex(); } + @Override + public void resetOffsetAfterSnapshot(DLedgerEntry entry) { + + } + + @Override + public void updateIndexAfterLoadingSnapshot(long lastIncludedIndex, long lastIncludedTerm) { + this.ledgerBeforeBeginIndex = lastIncludedIndex; + this.ledgerEndIndex = lastIncludedIndex; + this.ledgerEndTerm = lastIncludedTerm; + } + + @Override + public void startup() { + + } + + @Override + public void shutdown() { + + } + @Override public DLedgerEntry appendAsFollower(DLedgerEntry entry, long leaderTerm, String leaderId) { PreConditions.check(memberState.isFollower(), DLedgerResponseCode.NOT_FOLLOWER); diff --git a/dledger/src/main/java/io/openmessaging/storage/dledger/store/DLedgerStore.java b/dledger/src/main/java/io/openmessaging/storage/dledger/store/DLedgerStore.java index 013dc087..9b0b0bcb 100644 --- a/dledger/src/main/java/io/openmessaging/storage/dledger/store/DLedgerStore.java +++ b/dledger/src/main/java/io/openmessaging/storage/dledger/store/DLedgerStore.java @@ -59,15 +59,12 @@ public long truncate(DLedgerEntry entry, long leaderTerm, String leaderId) { return -1; } - public void resetOffsetAfterSnapshot(DLedgerEntry entry) { + public abstract void resetOffsetAfterSnapshot(DLedgerEntry entry); - } - - public void startup() { + public abstract void updateIndexAfterLoadingSnapshot(long lastIncludedIndex, long lastIncludedTerm); - } + public abstract void startup(); - public void shutdown() { + public abstract void shutdown(); - } } diff --git a/dledger/src/main/java/io/openmessaging/storage/dledger/store/file/DLedgerMmapFileStore.java b/dledger/src/main/java/io/openmessaging/storage/dledger/store/file/DLedgerMmapFileStore.java index a8c5375d..f61b1c36 100644 --- a/dledger/src/main/java/io/openmessaging/storage/dledger/store/file/DLedgerMmapFileStore.java +++ b/dledger/src/main/java/io/openmessaging/storage/dledger/store/file/DLedgerMmapFileStore.java @@ -51,7 +51,7 @@ public class DLedgerMmapFileStore extends DLedgerStore { private static final Logger LOGGER = LoggerFactory.getLogger(DLedgerMmapFileStore.class); public List appendHooks = new ArrayList<>(); - private long ledgerBeforeBeginIndex = -1; + private volatile long ledgerBeforeBeginIndex = -1; private long ledgerBeginIndex = -1; private long ledgerEndIndex = -1; private long committedIndex = -1; @@ -303,7 +303,6 @@ public void recover() { DLedgerEntry entry = get(lastEntryIndex); PreConditions.check(entry != null, DLedgerResponseCode.DISK_ERROR, "recheck get null entry"); PreConditions.check(entry.getIndex() == lastEntryIndex, DLedgerResponseCode.DISK_ERROR, "recheck index %d != %d", entry.getIndex(), lastEntryIndex); - reviseLedgerBeforeBeginIndex(); } this.dataFileList.updateWherePosition(processOffset); this.dataFileList.truncateOffset(processOffset); @@ -346,7 +345,6 @@ private void reviseLedgerBeginIndex() { private void reviseLedgerBeforeBeginIndex() { // get ledger begin index - System.out.println(this.memberState.getSelfId() + " start to revise before index, now before index = " + this.ledgerBeforeBeginIndex); MmapFile firstFile = dataFileList.getFirstMappedFile(); SelectMmapBufferResult sbr = firstFile.selectMappedBuffer(0); try { @@ -360,7 +358,6 @@ private void reviseLedgerBeforeBeginIndex() { } // begin index long beginIndex = tmpBuffer.getLong(); - System.out.println(this.memberState.getSelfId() + " update before index from " + this.ledgerBeginIndex + " to " + (beginIndex - 1)); this.ledgerBeforeBeginIndex = beginIndex - 1; indexFileList.resetOffset(beginIndex * INDEX_UNIT_SIZE); } finally { @@ -501,14 +498,21 @@ public void resetOffsetAfterSnapshot(DLedgerEntry entry) { if (entry.getIndex() <= this.ledgerBeforeBeginIndex) { return; } - System.out.println(this.memberState.getSelfId() + " reset offset after snapshot, now before index = " + this.ledgerBeforeBeginIndex + ", snapshot last included index = " + entry.getIndex()); - long resetPos = entry.getPos() + entry.getSize(); - dataFileList.resetOffset(resetPos); - long resetIndexOffset = entry.getIndex() * INDEX_UNIT_SIZE; - indexFileList.resetOffset(resetIndexOffset); - // reset ledgerBeforeBeginIndex - System.out.println(this.memberState.getSelfId() + " update before index from " + this.ledgerBeginIndex + " to " + entry.getIndex()); - this.ledgerBeforeBeginIndex = entry.getIndex(); + synchronized (this.memberState) { + long resetPos = entry.getPos() + entry.getSize(); + dataFileList.resetOffset(resetPos); + long resetIndexOffset = entry.getIndex() * INDEX_UNIT_SIZE; + indexFileList.resetOffset(resetIndexOffset); + // reset ledgerBeforeBeginIndex + this.ledgerBeforeBeginIndex = entry.getIndex(); + } + } + + @Override + public void updateIndexAfterLoadingSnapshot(long lastIncludedIndex, long lastIncludedTerm) { + this.ledgerBeforeBeginIndex = lastIncludedIndex; + this.ledgerEndIndex = lastIncludedIndex; + this.ledgerEndTerm = lastIncludedTerm; } @Override diff --git a/dledger/src/test/java/io/openmessaging/storage/dledger/snapshot/SnapshotManagerTest.java b/dledger/src/test/java/io/openmessaging/storage/dledger/snapshot/SnapshotManagerTest.java index 0cb27602..bd9b11c0 100644 --- a/dledger/src/test/java/io/openmessaging/storage/dledger/snapshot/SnapshotManagerTest.java +++ b/dledger/src/test/java/io/openmessaging/storage/dledger/snapshot/SnapshotManagerTest.java @@ -41,33 +41,33 @@ public void testSaveAndLoadSnapshot() throws InterruptedException { }; // Launch client DLedgerClient dLedgerClient = launchClient(group, peers.split(";")[0]); - for (int i = 0; i < 100; i++) { + // append 99 entries, each 10 entries will trigger one snapshotting + for (int i = 0; i < 99; i++) { AppendEntryResponse appendEntryResponse = dLedgerClient.append(new byte[512]); assertEquals(DLedgerResponseCode.SUCCESS.getCode(), appendEntryResponse.getCode()); assertEquals(i, appendEntryResponse.getIndex()); } - Thread.sleep(5000); + Thread.sleep(2000); for (DLedgerServer server : serverList) { - assertEquals(99, server.getDLedgerStore().getLedgerEndIndex()); - assertEquals(99, server.getDLedgerStore().getLedgerBeforeBeginIndex()); + assertEquals(98, server.getDLedgerStore().getLedgerEndIndex()); + assertEquals(89, server.getDLedgerStore().getLedgerBeforeBeginIndex()); // check statemachine final MockStateMachine fsm = (MockStateMachine) server.getStateMachine(); - assertEquals(99, fsm.getAppliedIndex()); - assertEquals(100, fsm.getTotalEntries()); + assertEquals(99, fsm.getTotalEntries()); } + // now we append an entry will trigger the snapshotting + // this time will delete entries on a scale of 90 to 99 AppendEntryResponse appendEntryResponse = dLedgerClient.append(new byte[512]); assertEquals(DLedgerResponseCode.SUCCESS.getCode(), appendEntryResponse.getCode()); - assertEquals(100, appendEntryResponse.getIndex()); - - Thread.sleep(5000); + assertEquals(99, appendEntryResponse.getIndex()); + Thread.sleep(2000); for (DLedgerServer server : serverList) { - assertEquals(100, server.getDLedgerStore().getLedgerEndIndex()); + assertEquals(99, server.getDLedgerStore().getLedgerEndIndex()); assertEquals(99, server.getDLedgerStore().getLedgerBeforeBeginIndex()); // check statemachine final MockStateMachine fsm = (MockStateMachine) server.getStateMachine(); - assertEquals(100, fsm.getAppliedIndex()); - assertEquals(101, fsm.getTotalEntries()); + assertEquals(100, fsm.getTotalEntries()); } Thread.sleep(100); @@ -83,15 +83,14 @@ public void testSaveAndLoadSnapshot() throws InterruptedException { serverList.add(newDLedgerServer0); serverList.add(newDLedgerServer1); serverList.add(newDLedgerServer2); - Thread.sleep(5000); + Thread.sleep(2000); // State machine could only be recovered from snapshot due to the entry has been removed after saving snapshot for (DLedgerServer server : serverList) { - assertEquals(100, server.getDLedgerStore().getLedgerEndIndex()); + assertEquals(99, server.getDLedgerStore().getLedgerEndIndex()); assertEquals(99, server.getDLedgerStore().getLedgerBeforeBeginIndex()); // check statemachine final MockStateMachine fsm = (MockStateMachine) server.getStateMachine(); - assertEquals(100, fsm.getAppliedIndex()); - assertEquals(101, fsm.getTotalEntries()); + assertEquals(100, fsm.getTotalEntries()); } } diff --git a/dledger/src/test/java/io/openmessaging/storage/dledger/statemachine/MockStateMachine.java b/dledger/src/test/java/io/openmessaging/storage/dledger/statemachine/MockStateMachine.java index dcddd9de..2fa33bd8 100644 --- a/dledger/src/test/java/io/openmessaging/storage/dledger/statemachine/MockStateMachine.java +++ b/dledger/src/test/java/io/openmessaging/storage/dledger/statemachine/MockStateMachine.java @@ -31,7 +31,6 @@ public class MockStateMachine implements StateMachine { private static Logger logger = LoggerFactory.getLogger(MockStateMachine.class); private volatile long appliedIndex = -1L; private final AtomicLong totalEntries = new AtomicLong(0); - private final AtomicLong lastAppliedIndex = new AtomicLong(-1); @Override public void onApply(final CommittedEntryIterator iter) { @@ -43,6 +42,8 @@ public void onApply(final CommittedEntryIterator iter) { } this.totalEntries.addAndGet(1); this.appliedIndex = next.getIndex(); + System.out.println("apply index: " + next.getIndex()); + System.out.println("total entries: " + this.totalEntries.get()); } } } @@ -51,6 +52,7 @@ public void onApply(final CommittedEntryIterator iter) { public boolean onSnapshotSave(final SnapshotWriter writer) { long curEntryCnt = this.totalEntries.get(); MockSnapshotFile snapshotFile = new MockSnapshotFile(writer.getSnapshotStorePath() + File.separator + SnapshotManager.SNAPSHOT_DATA_FILE); + System.out.println("save snapshot, total entries: " + curEntryCnt); return snapshotFile.save(curEntryCnt); } From 0ebdeaef312e2e9615ddcd16717bd380f60b7d61 Mon Sep 17 00:00:00 2001 From: TheR1sing3un Date: Tue, 3 Jan 2023 23:45:06 +0800 Subject: [PATCH 3/3] fix: fix some problems in snapshot mode 1. fix some problems in snapshot mode --- .../storage/dledger/DLedgerEntryPusher.java | 12 +++++++----- .../dledger/store/file/DLedgerMmapFileStore.java | 1 + .../dledger/store/DLedgerMappedFileStoreTest.java | 4 ++-- 3 files changed, 10 insertions(+), 7 deletions(-) diff --git a/dledger/src/main/java/io/openmessaging/storage/dledger/DLedgerEntryPusher.java b/dledger/src/main/java/io/openmessaging/storage/dledger/DLedgerEntryPusher.java index f27ff62d..8dfa41ec 100644 --- a/dledger/src/main/java/io/openmessaging/storage/dledger/DLedgerEntryPusher.java +++ b/dledger/src/main/java/io/openmessaging/storage/dledger/DLedgerEntryPusher.java @@ -318,13 +318,13 @@ public void doWork() { .collect(Collectors.toList()); long quorumIndex = sortedWaterMarks.get(sortedWaterMarks.size() / 2); final Optional fsmCaller = DLedgerEntryPusher.this.fsmCaller; - if (quorumIndex == this.lastQuorumIndex) return; if (fsmCaller.isPresent()) { // If there exist statemachine - DLedgerEntryPusher.this.dLedgerStore.updateCommittedIndex(currTerm, quorumIndex); final StateMachineCaller caller = fsmCaller.get(); - caller.onCommitted(quorumIndex); - + if (quorumIndex > this.lastQuorumIndex) { + DLedgerEntryPusher.this.dLedgerStore.updateCommittedIndex(currTerm, quorumIndex); + caller.onCommitted(quorumIndex); + } // Check elapsed if (DLedgerUtils.elapsed(lastCheckLeakTimeMs) > 1000) { updatePeerWaterMark(currTerm, memberState.getSelfId(), dLedgerStore.getLedgerEndIndex()); @@ -336,7 +336,9 @@ public void doWork() { waitForRunning(1); } } else { - dLedgerStore.updateCommittedIndex(currTerm, quorumIndex); + if (quorumIndex > this.lastQuorumIndex) { + dLedgerStore.updateCommittedIndex(currTerm, quorumIndex); + } ConcurrentMap> responses = pendingAppendResponsesByTerm.get(currTerm); boolean needCheck = false; int ackNum = 0; diff --git a/dledger/src/main/java/io/openmessaging/storage/dledger/store/file/DLedgerMmapFileStore.java b/dledger/src/main/java/io/openmessaging/storage/dledger/store/file/DLedgerMmapFileStore.java index f61b1c36..c9a08366 100644 --- a/dledger/src/main/java/io/openmessaging/storage/dledger/store/file/DLedgerMmapFileStore.java +++ b/dledger/src/main/java/io/openmessaging/storage/dledger/store/file/DLedgerMmapFileStore.java @@ -303,6 +303,7 @@ public void recover() { DLedgerEntry entry = get(lastEntryIndex); PreConditions.check(entry != null, DLedgerResponseCode.DISK_ERROR, "recheck get null entry"); PreConditions.check(entry.getIndex() == lastEntryIndex, DLedgerResponseCode.DISK_ERROR, "recheck index %d != %d", entry.getIndex(), lastEntryIndex); + reviseLedgerBeforeBeginIndex(); } this.dataFileList.updateWherePosition(processOffset); this.dataFileList.truncateOffset(processOffset); diff --git a/dledger/src/test/java/io/openmessaging/storage/dledger/store/DLedgerMappedFileStoreTest.java b/dledger/src/test/java/io/openmessaging/storage/dledger/store/DLedgerMappedFileStoreTest.java index ff43f188..45bf12a6 100644 --- a/dledger/src/test/java/io/openmessaging/storage/dledger/store/DLedgerMappedFileStoreTest.java +++ b/dledger/src/test/java/io/openmessaging/storage/dledger/store/DLedgerMappedFileStoreTest.java @@ -302,7 +302,7 @@ public void testResetOffsetAndRecover() { Assertions.assertEquals(i, resEntry.getIndex()); } Assertions.assertEquals(10, fileStore.getDataFileList().getMappedFiles().size()); - Assertions.assertEquals(0, fileStore.getLedgerBeginIndex()); + Assertions.assertEquals(-1, fileStore.getLedgerBeforeBeginIndex()); Assertions.assertEquals(9, fileStore.getLedgerEndIndex()); // reset offset, discard the first 9 entries @@ -324,7 +324,7 @@ public void testResetOffsetAndRecover() { fileStore.shutdown(); fileStore = createFileStore(group, peers, "n0", "n0", 1024, 1024, 0); Assertions.assertEquals(1, fileStore.getDataFileList().getMappedFiles().size()); - Assertions.assertEquals(9, fileStore.getLedgerBeginIndex()); + Assertions.assertEquals(8, fileStore.getLedgerBeforeBeginIndex()); Assertions.assertEquals(9, fileStore.getLedgerEndIndex()); }