diff --git a/dledger/src/main/java/io/openmessaging/storage/dledger/DLedgerConfig.java b/dledger/src/main/java/io/openmessaging/storage/dledger/DLedgerConfig.java index 246b435b..244e107e 100644 --- a/dledger/src/main/java/io/openmessaging/storage/dledger/DLedgerConfig.java +++ b/dledger/src/main/java/io/openmessaging/storage/dledger/DLedgerConfig.java @@ -91,6 +91,9 @@ public class DLedgerConfig { private long leadershipTransferWaitTimeout = 1000; + private int snapshotThreshold = 1000; + private int maxSnapshotReservedNum = 3; + public String getDefaultPath() { return storeBaseDir + File.separator + "dledger-" + selfId; } @@ -106,6 +109,10 @@ public void setDataStorePath(String dataStorePath) { this.dataStorePath = dataStorePath; } + public String getSnapshotStoreBaseDir() { + return getDefaultPath() + File.separator + "snapshot"; + } + public String getIndexStorePath() { return getDefaultPath() + File.separator + "index"; } @@ -463,4 +470,19 @@ public Map getPeerAddressMap() { return this.peerAddressMap; } + public int getSnapshotThreshold() { + return snapshotThreshold; + } + + public void setSnapshotThreshold(int snapshotThreshold) { + this.snapshotThreshold = snapshotThreshold; + } + + public int getMaxSnapshotReservedNum() { + return maxSnapshotReservedNum; + } + + public void setMaxSnapshotReservedNum(int maxSnapshotReservedNum) { + this.maxSnapshotReservedNum = maxSnapshotReservedNum; + } } diff --git a/dledger/src/main/java/io/openmessaging/storage/dledger/DLedgerServer.java b/dledger/src/main/java/io/openmessaging/storage/dledger/DLedgerServer.java index a4a60471..d5fda4a3 100644 --- a/dledger/src/main/java/io/openmessaging/storage/dledger/DLedgerServer.java +++ b/dledger/src/main/java/io/openmessaging/storage/dledger/DLedgerServer.java @@ -36,6 +36,7 @@ import io.openmessaging.storage.dledger.protocol.PushEntryResponse; import io.openmessaging.storage.dledger.protocol.VoteRequest; import io.openmessaging.storage.dledger.protocol.VoteResponse; +import io.openmessaging.storage.dledger.snapshot.SnapshotManager; import io.openmessaging.storage.dledger.statemachine.StateMachine; import io.openmessaging.storage.dledger.statemachine.StateMachineCaller; import io.openmessaging.storage.dledger.store.DLedgerMemoryStore; @@ -139,6 +140,11 @@ public DLedgerServer(DLedgerConfig dLedgerConfig, DLedgerRpcService dLedgerRpcSe public synchronized void startup() { if (!isStarted) { this.dLedgerStore.startup(); + this.fsmCaller.ifPresent(x -> { + // Start state machine caller and load existing snapshots for data recovery + x.start(); + x.getSnapshotManager().loadSnapshot(); + }); if (RpcServiceMode.EXCLUSIVE.equals(this.rpcServiceMode)) { this.dLedgerRpcService.startup(); } @@ -183,7 +189,7 @@ public synchronized void registerStateMachine(final StateMachine fsm) { throw new IllegalStateException("can not register statemachine after dledger server starts"); } final StateMachineCaller fsmCaller = new StateMachineCaller(this.dLedgerStore, fsm, this.dLedgerEntryPusher); - fsmCaller.start(); + fsmCaller.registerSnapshotManager(new SnapshotManager(this)); this.fsmCaller = Optional.of(fsmCaller); // Register state machine caller to entry pusher this.dLedgerEntryPusher.registerStateMachine(this.fsmCaller); @@ -546,6 +552,10 @@ public NettyRemotingClient getRemotingClient() { return null; } + public StateMachineCaller getFsmCaller() { + return fsmCaller.orElseThrow(NullPointerException::new); + } + public boolean isLeader() { return this.memberState.isLeader(); } diff --git a/dledger/src/main/java/io/openmessaging/storage/dledger/protocol/DLedgerResponseCode.java b/dledger/src/main/java/io/openmessaging/storage/dledger/protocol/DLedgerResponseCode.java index 6022bdca..be4c2f19 100644 --- a/dledger/src/main/java/io/openmessaging/storage/dledger/protocol/DLedgerResponseCode.java +++ b/dledger/src/main/java/io/openmessaging/storage/dledger/protocol/DLedgerResponseCode.java @@ -54,7 +54,8 @@ public enum DLedgerResponseCode { LEADER_PENDING_FULL(503, ""), ILLEGAL_MEMBER_STATE(504, ""), LEADER_NOT_READY(505, ""), - LEADER_TRANSFERRING(506, ""); + LEADER_TRANSFERRING(506, ""), + LOAD_SNAPSHOT_ERROR(507, ""); private static Map codeMap = new HashMap<>(); diff --git a/dledger/src/main/java/io/openmessaging/storage/dledger/snapshot/SnapshotManager.java b/dledger/src/main/java/io/openmessaging/storage/dledger/snapshot/SnapshotManager.java new file mode 100644 index 00000000..a537287f --- /dev/null +++ b/dledger/src/main/java/io/openmessaging/storage/dledger/snapshot/SnapshotManager.java @@ -0,0 +1,261 @@ +/* + * Copyright 2017-2022 The DLedger Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.openmessaging.storage.dledger.snapshot; + +import io.openmessaging.storage.dledger.DLedgerConfig; +import io.openmessaging.storage.dledger.DLedgerServer; +import io.openmessaging.storage.dledger.entry.DLedgerEntry; +import io.openmessaging.storage.dledger.exception.DLedgerException; +import io.openmessaging.storage.dledger.protocol.DLedgerResponseCode; +import io.openmessaging.storage.dledger.snapshot.file.FileSnapshotStore; +import io.openmessaging.storage.dledger.snapshot.hook.LoadSnapshotHook; +import io.openmessaging.storage.dledger.snapshot.hook.SaveSnapshotHook; +import io.openmessaging.storage.dledger.utils.IOUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.IOException; +import java.util.Objects; +import java.util.concurrent.CompletableFuture; + +public class SnapshotManager { + + private static Logger logger = LoggerFactory.getLogger(SnapshotManager.class); + + public static final String SNAPSHOT_META_FILE = "snapshot_meta"; + public static final String SNAPSHOT_DATA_FILE = "data"; + public static final String SNAPSHOT_DIR_PREFIX = "snapshot_"; + public static final String SNAPSHOT_TEMP_DIR = "tmp"; + + private DLedgerServer dLedgerServer; + private long lastSnapshotIndex; + private long lastSnapshotTerm; + private final SnapshotStore snapshotStore; + private volatile boolean savingSnapshot; + private volatile boolean loadingSnapshot; + + public SnapshotManager(DLedgerServer dLedgerServer) { + this.dLedgerServer = dLedgerServer; + this.snapshotStore = new FileSnapshotStore(this.dLedgerServer.getDLedgerConfig().getSnapshotStoreBaseDir()); + } + + public boolean isSavingSnapshot() { + return savingSnapshot; + } + + public boolean isLoadingSnapshot() { + return loadingSnapshot; + } + + private class SaveSnapshotAfterHook implements SaveSnapshotHook { + + SnapshotWriter writer; + DLedgerEntry dLedgerEntry; + SnapshotMeta snapshotMeta; + + public SaveSnapshotAfterHook(SnapshotWriter writer, DLedgerEntry dLedgerEntry) { + this.writer = writer; + this.dLedgerEntry = dLedgerEntry; + } + + @Override + public void doCallBack(SnapshotStatus status) { + saveSnapshotAfter(writer, snapshotMeta, dLedgerEntry, status); + } + + @Override + public void registerSnapshotMeta(SnapshotMeta snapshotMeta) { + this.snapshotMeta = snapshotMeta; + this.writer.setSnapshotMeta(snapshotMeta); + } + + @Override + public SnapshotWriter getSnapshotWriter() { + return this.writer; + } + + @Override + public DLedgerEntry getSnapshotEntry() { + return this.dLedgerEntry; + } + } + + private class LoadSnapshotAfterHook implements LoadSnapshotHook { + + SnapshotReader reader; + SnapshotMeta snapshotMeta; + + public LoadSnapshotAfterHook(SnapshotReader reader) { + this.reader = reader; + } + + @Override + public void doCallBack(SnapshotStatus status) { + loadSnapshotAfter(reader, snapshotMeta, status); + } + + @Override + public void registerSnapshotMeta(SnapshotMeta snapshotMeta) { + this.snapshotMeta = snapshotMeta; + } + + @Override + public SnapshotReader getSnapshotReader() { + return this.reader; + } + } + + public void saveSnapshot(DLedgerEntry dLedgerEntry) { + // Check if still saving other snapshots + if (this.savingSnapshot) { + return; + } + // Check if applied index reaching the snapshot threshold + if (dLedgerEntry.getIndex() - this.lastSnapshotIndex <= this.dLedgerServer.getDLedgerConfig().getSnapshotThreshold()) { + return; + } + // Create snapshot writer + SnapshotWriter writer = this.snapshotStore.createSnapshotWriter(); + if (writer == null) { + return; + } + // Start saving snapshot + this.savingSnapshot = true; + SaveSnapshotAfterHook saveSnapshotAfter = new SaveSnapshotAfterHook(writer, dLedgerEntry); + if (!this.dLedgerServer.getFsmCaller().onSnapshotSave(saveSnapshotAfter)) { + logger.error("Unable to call statemachine onSnapshotSave"); + saveSnapshotAfter.doCallBack(SnapshotStatus.FAIL); + } + } + + private void saveSnapshotAfter(SnapshotWriter writer, SnapshotMeta snapshotMeta, DLedgerEntry dLedgerEntry, SnapshotStatus status) { + int res = status.getCode(); + // Update snapshot meta + if (res == SnapshotStatus.SUCCESS.getCode()) { + writer.setSnapshotMeta(snapshotMeta); + } + // Write snapshot meta into files and close snapshot writer + try { + writer.save(status); + } catch (IOException e) { + logger.error("Unable to close snapshot writer", e); + res = SnapshotStatus.FAIL.getCode(); + } + if (res == SnapshotStatus.SUCCESS.getCode()) { + this.lastSnapshotIndex = snapshotMeta.getLastIncludedIndex(); + this.lastSnapshotTerm = snapshotMeta.getLastIncludedTerm(); + logger.info("Snapshot {} saved successfully", snapshotMeta); + // Remove previous logs + CompletableFuture.runAsync(() -> { + truncatePrefix(dLedgerEntry); + }); + } else { + logger.error("Unable to save snapshot"); + } + this.savingSnapshot = false; + } + + private void truncatePrefix(DLedgerEntry entry) { + deleteExpiredSnapshot(); + this.dLedgerServer.getFsmCaller().getdLedgerStore().resetOffsetAfterSnapshot(entry); + } + + private void deleteExpiredSnapshot() { + // Remove the oldest snapshot + DLedgerConfig config = dLedgerServer.getDLedgerConfig(); + File[] snapshotFiles = new File(config.getSnapshotStoreBaseDir()).listFiles(); + if (snapshotFiles != null && snapshotFiles.length > config.getMaxSnapshotReservedNum()) { + long minSnapshotIdx = Long.MAX_VALUE; + for (File file : snapshotFiles) { + String fileName = file.getName(); + if (!fileName.startsWith(SnapshotManager.SNAPSHOT_DIR_PREFIX)) { + continue; + } + minSnapshotIdx = Math.min(Long.parseLong(fileName.substring(SnapshotManager.SNAPSHOT_DIR_PREFIX.length())), minSnapshotIdx); + } + String deleteFilePath = config.getSnapshotStoreBaseDir() + File.separator + SnapshotManager.SNAPSHOT_DIR_PREFIX + minSnapshotIdx; + try { + IOUtils.deleteFile(new File(deleteFilePath)); + } catch (IOException e) { + logger.error("Unable to remove expired snapshot: {}", deleteFilePath, e); + } + } + } + + public void loadSnapshot() { + // Check if still loading snapshot + if (loadingSnapshot) { + return; + } + // Create snapshot reader + SnapshotReader reader = snapshotStore.createSnapshotReader(); + if (reader == null) { + return; + } + // Start loading snapshot + this.loadingSnapshot = true; + LoadSnapshotAfterHook loadSnapshotAfter = new LoadSnapshotAfterHook(reader); + if (!this.dLedgerServer.getFsmCaller().onSnapshotLoad(loadSnapshotAfter)) { + this.dLedgerServer.getFsmCaller().setError(this.dLedgerServer, + new DLedgerException(DLedgerResponseCode.LOAD_SNAPSHOT_ERROR, "Unable to call statemachine onSnapshotLoad")); + } + } + + private void loadSnapshotAfter(SnapshotReader reader, SnapshotMeta snapshotMeta, SnapshotStatus status) { + if (status.getCode() == SnapshotStatus.SUCCESS.getCode()) { + this.lastSnapshotIndex = snapshotMeta.getLastIncludedIndex(); + this.lastSnapshotTerm = snapshotMeta.getLastIncludedTerm(); + this.loadingSnapshot = false; + logger.info("Snapshot {} loaded successfully", snapshotMeta); + } else { + // Stop the loading process if the snapshot is expired + if (status.getCode() == SnapshotStatus.EXPIRED.getCode()) { + this.loadingSnapshot = false; + return; + } + // Remove the error snapshot + boolean failed = false; + try { + IOUtils.deleteFile(new File(reader.getSnapshotStorePath())); + } catch (IOException e) { + logger.error("Unable to remove error snapshot: {}", reader.getSnapshotStorePath(), e); + failed = true; + } + // Check if there is snapshot exists + DLedgerConfig config = this.dLedgerServer.getDLedgerConfig(); + if (Objects.requireNonNull(new File(config.getSnapshotStoreBaseDir()).listFiles()).length == 0) { + logger.error("No snapshot for recovering state machine: {}", config.getSnapshotStoreBaseDir()); + failed = true; + } + if (failed) { + // Still able to recover from files if the beginning index of file store is 0 + if (this.dLedgerServer.getFsmCaller().getdLedgerStore().getLedgerBeginIndex() == 0) { + this.loadingSnapshot = false; + return; + } + this.dLedgerServer.getFsmCaller().setError(this.dLedgerServer, + new DLedgerException(DLedgerResponseCode.LOAD_SNAPSHOT_ERROR, "Fail to recover state machine")); + return; + } + // Retry loading the previous snapshots + logger.warn("Load snapshot from {} failed. Start recovering from the previous snapshot", reader.getSnapshotStorePath()); + this.loadingSnapshot = false; + loadSnapshot(); + } + } +} diff --git a/dledger/src/main/java/io/openmessaging/storage/dledger/snapshot/SnapshotMeta.java b/dledger/src/main/java/io/openmessaging/storage/dledger/snapshot/SnapshotMeta.java new file mode 100644 index 00000000..375ff1d2 --- /dev/null +++ b/dledger/src/main/java/io/openmessaging/storage/dledger/snapshot/SnapshotMeta.java @@ -0,0 +1,52 @@ +/* + * Copyright 2017-2022 The DLedger Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.openmessaging.storage.dledger.snapshot; + +public class SnapshotMeta { + + private long lastIncludedIndex; + private long lastIncludedTerm; + + public SnapshotMeta(long lastIncludedIndex, long lastIncludedTerm) { + this.lastIncludedIndex = lastIncludedIndex; + this.lastIncludedTerm = lastIncludedTerm; + } + + public long getLastIncludedIndex() { + return lastIncludedIndex; + } + + public void setLastIncludedIndex(int lastIncludedIndex) { + this.lastIncludedIndex = lastIncludedIndex; + } + + public long getLastIncludedTerm() { + return lastIncludedTerm; + } + + public void setLastIncludedTerm(int lastIncludedTerm) { + this.lastIncludedTerm = lastIncludedTerm; + } + + @Override + public String toString() { + return "SnapshotMeta{" + + "lastIncludedIndex=" + lastIncludedIndex + + ", lastIncludedTerm=" + lastIncludedTerm + + '}'; + } +} diff --git a/dledger/src/main/java/io/openmessaging/storage/dledger/snapshot/SnapshotReader.java b/dledger/src/main/java/io/openmessaging/storage/dledger/snapshot/SnapshotReader.java index ca7fb254..061586c4 100644 --- a/dledger/src/main/java/io/openmessaging/storage/dledger/snapshot/SnapshotReader.java +++ b/dledger/src/main/java/io/openmessaging/storage/dledger/snapshot/SnapshotReader.java @@ -16,8 +16,16 @@ package io.openmessaging.storage.dledger.snapshot; +import java.io.IOException; + /** * Reader for snapshot */ public interface SnapshotReader { + + SnapshotMeta load() throws IOException; + + SnapshotMeta getSnapshotMeta(); + + String getSnapshotStorePath(); } diff --git a/dledger/src/main/java/io/openmessaging/storage/dledger/snapshot/SnapshotStatus.java b/dledger/src/main/java/io/openmessaging/storage/dledger/snapshot/SnapshotStatus.java new file mode 100644 index 00000000..6df6ffdb --- /dev/null +++ b/dledger/src/main/java/io/openmessaging/storage/dledger/snapshot/SnapshotStatus.java @@ -0,0 +1,55 @@ +/* + * Copyright 2017-2022 The DLedger Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.openmessaging.storage.dledger.snapshot; + +import java.util.HashMap; +import java.util.Map; + +public enum SnapshotStatus { + + UNKNOWN(-1), + SUCCESS(0), + FAIL(10001), + EXPIRED(10002); + + private static Map codeMap = new HashMap<>(); + + static { + for (SnapshotStatus status : SnapshotStatus.values()) { + codeMap.put(status.code, status); + } + } + + private int code; + + SnapshotStatus(int code) { + this.code = code; + } + + public static SnapshotStatus valueOf(int code) { + SnapshotStatus tmp = codeMap.get(code); + if (tmp != null) { + return tmp; + } else { + return UNKNOWN; + } + } + + public int getCode() { + return code; + } +} diff --git a/dledger/src/main/java/io/openmessaging/storage/dledger/snapshot/SnapshotStore.java b/dledger/src/main/java/io/openmessaging/storage/dledger/snapshot/SnapshotStore.java new file mode 100644 index 00000000..e5d1e18a --- /dev/null +++ b/dledger/src/main/java/io/openmessaging/storage/dledger/snapshot/SnapshotStore.java @@ -0,0 +1,24 @@ +/* + * Copyright 2017-2022 The DLedger Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.openmessaging.storage.dledger.snapshot; + +public interface SnapshotStore { + + SnapshotWriter createSnapshotWriter(); + + SnapshotReader createSnapshotReader(); +} diff --git a/dledger/src/main/java/io/openmessaging/storage/dledger/snapshot/SnapshotWriter.java b/dledger/src/main/java/io/openmessaging/storage/dledger/snapshot/SnapshotWriter.java index f454d011..f99c0164 100644 --- a/dledger/src/main/java/io/openmessaging/storage/dledger/snapshot/SnapshotWriter.java +++ b/dledger/src/main/java/io/openmessaging/storage/dledger/snapshot/SnapshotWriter.java @@ -16,8 +16,16 @@ package io.openmessaging.storage.dledger.snapshot; +import java.io.IOException; + /** * Writer for snapshot */ public interface SnapshotWriter { + + void save(SnapshotStatus status) throws IOException; + + void setSnapshotMeta(SnapshotMeta snapshotMeta); + + String getSnapshotStorePath(); } diff --git a/dledger/src/main/java/io/openmessaging/storage/dledger/snapshot/file/FileSnapshotReader.java b/dledger/src/main/java/io/openmessaging/storage/dledger/snapshot/file/FileSnapshotReader.java new file mode 100644 index 00000000..7afc6738 --- /dev/null +++ b/dledger/src/main/java/io/openmessaging/storage/dledger/snapshot/file/FileSnapshotReader.java @@ -0,0 +1,61 @@ +/* + * Copyright 2017-2022 The DLedger Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.openmessaging.storage.dledger.snapshot.file; + +import com.alibaba.fastjson.JSON; +import io.openmessaging.storage.dledger.snapshot.SnapshotManager; +import io.openmessaging.storage.dledger.snapshot.SnapshotMeta; +import io.openmessaging.storage.dledger.snapshot.SnapshotReader; +import io.openmessaging.storage.dledger.utils.IOUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.IOException; + +public class FileSnapshotReader implements SnapshotReader { + + private static Logger logger = LoggerFactory.getLogger(FileSnapshotReader.class); + + private final String snapshotStorePath; + private SnapshotMeta snapshotMeta; + + public FileSnapshotReader(String snapshotStorePath) { + this.snapshotStorePath = snapshotStorePath; + } + + @Override + public SnapshotMeta load() throws IOException { + SnapshotMeta snapshotMetaFromJSON = JSON.parseObject(IOUtils.file2String(this.snapshotStorePath + + File.separator + SnapshotManager.SNAPSHOT_META_FILE), SnapshotMeta.class); + if (snapshotMetaFromJSON == null) { + return null; + } + this.snapshotMeta = snapshotMetaFromJSON; + return snapshotMeta; + } + + @Override + public SnapshotMeta getSnapshotMeta() { + return this.snapshotMeta != null ? this.snapshotMeta : null; + } + + @Override + public String getSnapshotStorePath() { + return this.snapshotStorePath; + } +} diff --git a/dledger/src/main/java/io/openmessaging/storage/dledger/snapshot/file/FileSnapshotStore.java b/dledger/src/main/java/io/openmessaging/storage/dledger/snapshot/file/FileSnapshotStore.java new file mode 100644 index 00000000..20f18182 --- /dev/null +++ b/dledger/src/main/java/io/openmessaging/storage/dledger/snapshot/file/FileSnapshotStore.java @@ -0,0 +1,115 @@ +/* + * Copyright 2017-2022 The DLedger Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.openmessaging.storage.dledger.snapshot.file; + +import io.openmessaging.storage.dledger.snapshot.SnapshotManager; +import io.openmessaging.storage.dledger.snapshot.SnapshotReader; +import io.openmessaging.storage.dledger.snapshot.SnapshotStore; +import io.openmessaging.storage.dledger.snapshot.SnapshotWriter; +import io.openmessaging.storage.dledger.utils.IOUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.IOException; + +public class FileSnapshotStore implements SnapshotStore { + + private static Logger logger = LoggerFactory.getLogger(FileSnapshotStore.class); + + private final String snapshotStoreBaseDir; + + public FileSnapshotStore(String snapshotStoreBaseDir) { + this.snapshotStoreBaseDir = snapshotStoreBaseDir; + initStore(); + } + + private void initStore() { + // Create snapshot storage if the statemachine is first-time registered + File dir = new File(this.snapshotStoreBaseDir); + try { + IOUtils.mkDir(dir); + } catch (IOException e) { + logger.error("Unable to create snapshot storage directory {}", this.snapshotStoreBaseDir, e); + throw new RuntimeException(e); + } + // Clean temp directory to remove existing dirty snapshots + File tmpSnapshot = new File(this.snapshotStoreBaseDir + File.separator + SnapshotManager.SNAPSHOT_TEMP_DIR); + if (tmpSnapshot.exists()) { + try { + IOUtils.deleteFile(tmpSnapshot); + } catch (IOException e) { + logger.error("Unable to clean temp snapshots {}", tmpSnapshot.getPath(), e); + throw new RuntimeException(e); + } + } + } + + @Override + public SnapshotWriter createSnapshotWriter() { + // Delete temp snapshot + String tmpSnapshotStorePath = this.snapshotStoreBaseDir + File.separator + SnapshotManager.SNAPSHOT_TEMP_DIR; + if (new File(tmpSnapshotStorePath).exists()) { + try { + IOUtils.deleteFile(new File(tmpSnapshotStorePath)); + } catch (IOException e) { + logger.error("Unable to delete temp snapshot: {}", tmpSnapshotStorePath, e); + return null; + } + } + // Create tmp directory for writing snapshots + File dir = new File(tmpSnapshotStorePath); + try { + IOUtils.mkDir(dir); + } catch (IOException e) { + logger.error("Unable to create snapshot storage directory: " + tmpSnapshotStorePath, e); + return null; + } + return new FileSnapshotWriter(tmpSnapshotStorePath, this); + } + + @Override + public SnapshotReader createSnapshotReader() { + long lastSnapshotIndex = getLastSnapshotIdx(); + if (lastSnapshotIndex == -1) { + logger.warn("No snapshot exists"); + return null; + } + String snapshotStorePath = this.snapshotStoreBaseDir + File.separator + + SnapshotManager.SNAPSHOT_DIR_PREFIX + lastSnapshotIndex; + return new FileSnapshotReader(snapshotStorePath); + } + + private long getLastSnapshotIdx() { + File[] snapshotFiles = new File(this.snapshotStoreBaseDir).listFiles(); + long lastSnapshotIdx = -1; + if (snapshotFiles != null && snapshotFiles.length > 0) { + for (File snapshotFile : snapshotFiles) { + String fileName = snapshotFile.getName(); + if (!fileName.startsWith(SnapshotManager.SNAPSHOT_DIR_PREFIX)) { + continue; + } + lastSnapshotIdx = Math.max(Long.parseLong(fileName.substring(SnapshotManager.SNAPSHOT_DIR_PREFIX.length())), lastSnapshotIdx); + } + } + return lastSnapshotIdx; + } + + public String getSnapshotStoreBaseDir() { + return snapshotStoreBaseDir; + } +} diff --git a/dledger/src/main/java/io/openmessaging/storage/dledger/snapshot/file/FileSnapshotWriter.java b/dledger/src/main/java/io/openmessaging/storage/dledger/snapshot/file/FileSnapshotWriter.java new file mode 100644 index 00000000..11ef354d --- /dev/null +++ b/dledger/src/main/java/io/openmessaging/storage/dledger/snapshot/file/FileSnapshotWriter.java @@ -0,0 +1,104 @@ +/* + * Copyright 2017-2022 The DLedger Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.openmessaging.storage.dledger.snapshot.file; + +import com.alibaba.fastjson.JSON; +import io.openmessaging.storage.dledger.snapshot.SnapshotManager; +import io.openmessaging.storage.dledger.snapshot.SnapshotMeta; +import io.openmessaging.storage.dledger.snapshot.SnapshotStatus; +import io.openmessaging.storage.dledger.snapshot.SnapshotWriter; +import io.openmessaging.storage.dledger.utils.IOUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.IOException; + +public class FileSnapshotWriter implements SnapshotWriter { + + private static Logger logger = LoggerFactory.getLogger(FileSnapshotWriter.class); + + private final String snapshotStorePath; + private final FileSnapshotStore snapshotStore; + private SnapshotMeta snapshotMeta; + + public FileSnapshotWriter(String snapshotStorePath, FileSnapshotStore snapshotStore) { + this.snapshotStorePath = snapshotStorePath; + this.snapshotStore = snapshotStore; + } + + @Override + public void save(SnapshotStatus status) throws IOException { + int res = status.getCode(); + IOException ioe = null; + do { + if (res != SnapshotStatus.SUCCESS.getCode()) { + break; + } + try { + sync(); + } catch (IOException e) { + logger.error("Unable to sync writer: {}", this.snapshotStorePath, e); + res = SnapshotStatus.FAIL.getCode(); + ioe = e; + break; + } + // Move temp to new + long snapshotIdx = getSnapshotIndex(); + String tmpPath = this.snapshotStorePath; + String officialPath = this.snapshotStore.getSnapshotStoreBaseDir() + File.separator + SnapshotManager.SNAPSHOT_DIR_PREFIX + snapshotIdx; + try { + IOUtils.atomicMvFile(new File(tmpPath), new File(officialPath)); + } catch (IOException e) { + logger.error("Unable to move temp snapshot from {} to {}", tmpPath, officialPath); + res = SnapshotStatus.FAIL.getCode(); + ioe = e; + break; + } + } while (false); + // If the writing process failed, delete the temp snapshot + if (res != SnapshotStatus.SUCCESS.getCode()) { + try { + IOUtils.deleteFile(new File(this.snapshotStorePath)); + } catch (IOException e) { + logger.error("Unable to delete temp snapshot: {}", this.snapshotStorePath, e); + ioe = e; + } + } + if (ioe != null) { + throw ioe; + } + } + + private void sync() throws IOException { + IOUtils.string2File(JSON.toJSONString(this.snapshotMeta), this.snapshotStorePath + File.separator + SnapshotManager.SNAPSHOT_META_FILE); + } + + @Override + public String getSnapshotStorePath() { + return this.snapshotStorePath; + } + + @Override + public void setSnapshotMeta(SnapshotMeta snapshotMeta) { + this.snapshotMeta = snapshotMeta; + } + + public long getSnapshotIndex() { + return this.snapshotMeta != null ? this.snapshotMeta.getLastIncludedIndex() : 0; + } +} diff --git a/dledger/src/main/java/io/openmessaging/storage/dledger/snapshot/hook/LoadSnapshotHook.java b/dledger/src/main/java/io/openmessaging/storage/dledger/snapshot/hook/LoadSnapshotHook.java new file mode 100644 index 00000000..acd45fb1 --- /dev/null +++ b/dledger/src/main/java/io/openmessaging/storage/dledger/snapshot/hook/LoadSnapshotHook.java @@ -0,0 +1,27 @@ +/* + * Copyright 2017-2022 The DLedger Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.openmessaging.storage.dledger.snapshot.hook; + +import io.openmessaging.storage.dledger.snapshot.SnapshotMeta; +import io.openmessaging.storage.dledger.snapshot.SnapshotReader; + +public interface LoadSnapshotHook extends SnapshotHook { + + void registerSnapshotMeta(SnapshotMeta snapshotMeta); + + SnapshotReader getSnapshotReader(); +} diff --git a/dledger/src/main/java/io/openmessaging/storage/dledger/snapshot/hook/SaveSnapshotHook.java b/dledger/src/main/java/io/openmessaging/storage/dledger/snapshot/hook/SaveSnapshotHook.java new file mode 100644 index 00000000..337533ca --- /dev/null +++ b/dledger/src/main/java/io/openmessaging/storage/dledger/snapshot/hook/SaveSnapshotHook.java @@ -0,0 +1,30 @@ +/* + * Copyright 2017-2022 The DLedger Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.openmessaging.storage.dledger.snapshot.hook; + +import io.openmessaging.storage.dledger.entry.DLedgerEntry; +import io.openmessaging.storage.dledger.snapshot.SnapshotMeta; +import io.openmessaging.storage.dledger.snapshot.SnapshotWriter; + +public interface SaveSnapshotHook extends SnapshotHook { + + void registerSnapshotMeta(final SnapshotMeta meta); + + SnapshotWriter getSnapshotWriter(); + + DLedgerEntry getSnapshotEntry(); +} diff --git a/dledger/src/main/java/io/openmessaging/storage/dledger/snapshot/hook/SnapshotHook.java b/dledger/src/main/java/io/openmessaging/storage/dledger/snapshot/hook/SnapshotHook.java new file mode 100644 index 00000000..8440d44f --- /dev/null +++ b/dledger/src/main/java/io/openmessaging/storage/dledger/snapshot/hook/SnapshotHook.java @@ -0,0 +1,24 @@ +/* + * Copyright 2017-2022 The DLedger Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.openmessaging.storage.dledger.snapshot.hook; + +import io.openmessaging.storage.dledger.snapshot.SnapshotStatus; + +public interface SnapshotHook { + + void doCallBack(SnapshotStatus status); +} diff --git a/dledger/src/main/java/io/openmessaging/storage/dledger/statemachine/StateMachine.java b/dledger/src/main/java/io/openmessaging/storage/dledger/statemachine/StateMachine.java index f9e40d7b..de03ee5d 100644 --- a/dledger/src/main/java/io/openmessaging/storage/dledger/statemachine/StateMachine.java +++ b/dledger/src/main/java/io/openmessaging/storage/dledger/statemachine/StateMachine.java @@ -16,10 +16,10 @@ package io.openmessaging.storage.dledger.statemachine; +import io.openmessaging.storage.dledger.exception.DLedgerException; import io.openmessaging.storage.dledger.snapshot.SnapshotReader; import io.openmessaging.storage.dledger.snapshot.SnapshotWriter; import io.openmessaging.storage.dledger.utils.DLedgerUtils; -import java.util.concurrent.CompletableFuture; /** * Finite state machine, which should be implemented by user. @@ -35,13 +35,11 @@ public interface StateMachine { void onApply(final CommittedEntryIterator iter); /** - * User defined snapshot generate function, this method will block StateMachine#onApply(Iterator). - * Call done.run(status) when snapshot finished. + * User defined snapshot generate function. * * @param writer snapshot writer - * @param done callback */ - void onSnapshotSave(final SnapshotWriter writer, final CompletableFuture done); + boolean onSnapshotSave(final SnapshotWriter writer); /** * User defined snapshot load function. @@ -57,6 +55,14 @@ public interface StateMachine { */ void onShutdown(); + /** + * Once a critical error occurs, disallow any task enter the Dledger node + * until the error has been fixed and restart it. + * + * @param error DLedger error message + */ + void onError(final DLedgerException error); + /** * User must create DLedgerId by this method, it will generate the DLedgerId with format like that: 'dLedgerGroupId#dLedgerSelfId' * @param dLedgerGroupId the group id of the DLedgerServer diff --git a/dledger/src/main/java/io/openmessaging/storage/dledger/statemachine/StateMachineCaller.java b/dledger/src/main/java/io/openmessaging/storage/dledger/statemachine/StateMachineCaller.java index 9d74a876..67779a9b 100644 --- a/dledger/src/main/java/io/openmessaging/storage/dledger/statemachine/StateMachineCaller.java +++ b/dledger/src/main/java/io/openmessaging/storage/dledger/statemachine/StateMachineCaller.java @@ -17,11 +17,25 @@ package io.openmessaging.storage.dledger.statemachine; import io.openmessaging.storage.dledger.DLedgerEntryPusher; +import io.openmessaging.storage.dledger.DLedgerServer; import io.openmessaging.storage.dledger.entry.DLedgerEntry; +import io.openmessaging.storage.dledger.exception.DLedgerException; +import io.openmessaging.storage.dledger.snapshot.SnapshotManager; +import io.openmessaging.storage.dledger.snapshot.SnapshotReader; +import io.openmessaging.storage.dledger.snapshot.SnapshotStatus; +import io.openmessaging.storage.dledger.snapshot.SnapshotWriter; +import io.openmessaging.storage.dledger.snapshot.SnapshotMeta; +import io.openmessaging.storage.dledger.snapshot.hook.LoadSnapshotHook; +import io.openmessaging.storage.dledger.snapshot.hook.SaveSnapshotHook; +import io.openmessaging.storage.dledger.snapshot.hook.SnapshotHook; import io.openmessaging.storage.dledger.store.DLedgerStore; + +import java.io.IOException; import java.util.concurrent.BlockingQueue; -import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Executors; import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; import java.util.function.Function; @@ -53,9 +67,10 @@ private static class ApplyTask { TaskType type; long committedIndex; long term; - CompletableFuture cb; + SnapshotHook snapshotHook; } + private static final long RETRY_ON_COMMITTED_DELAY = 1000; private static Logger logger = LoggerFactory.getLogger(StateMachineCaller.class); private final DLedgerStore dLedgerStore; private final StateMachine statemachine; @@ -64,7 +79,16 @@ private static class ApplyTask { private long lastAppliedTerm; private final AtomicLong applyingIndex; private final BlockingQueue taskQueue; + private final ScheduledExecutorService scheduledExecutorService = Executors + .newSingleThreadScheduledExecutor(new ThreadFactory() { + @Override + public Thread newThread(Runnable r) { + return new Thread(r, "RetryOnCommittedScheduledThread"); + } + }); private final Function completeEntryCallback; + private volatile DLedgerException error; + private SnapshotManager snapshotManager; public StateMachineCaller(final DLedgerStore dLedgerStore, final StateMachine statemachine, final DLedgerEntryPusher entryPusher) { @@ -96,17 +120,17 @@ public boolean onCommitted(final long committedIndex) { return enqueueTask(task); } - public boolean onSnapshotLoad(final CompletableFuture cb) { + public boolean onSnapshotLoad(final LoadSnapshotHook loadSnapshotAfter) { final ApplyTask task = new ApplyTask(); task.type = TaskType.SNAPSHOT_LOAD; - task.cb = cb; + task.snapshotHook = loadSnapshotAfter; return enqueueTask(task); } - public boolean onSnapshotSave(final CompletableFuture cb) { + public boolean onSnapshotSave(final SaveSnapshotHook saveSnapshotAfter) { final ApplyTask task = new ApplyTask(); task.type = TaskType.SNAPSHOT_SAVE; - task.cb = cb; + task.snapshotHook = saveSnapshotAfter; return enqueueTask(task); } @@ -127,10 +151,10 @@ public void run() { doCommitted(task.committedIndex); break; case SNAPSHOT_SAVE: - doSnapshotSave(task.cb); + doSnapshotSave((SaveSnapshotHook) task.snapshotHook); break; case SNAPSHOT_LOAD: - doSnapshotLoad(task.cb); + doSnapshotLoad((LoadSnapshotHook) task.snapshotHook); break; } } @@ -143,6 +167,20 @@ public void run() { } private void doCommitted(final long committedIndex) { + if (this.error != null) { + return; + } + if (this.snapshotManager.isLoadingSnapshot()) { + this.scheduledExecutorService.schedule(() -> { + try { + onCommitted(committedIndex); + logger.info("Still loading snapshot, retry the commit task later"); + } catch (Throwable e) { + e.printStackTrace(); + } + }, RETRY_ON_COMMITTED_DELAY, TimeUnit.MILLISECONDS); + return; + } final long lastAppliedIndex = this.lastAppliedIndex.get(); if (lastAppliedIndex >= committedIndex) { return; @@ -157,7 +195,8 @@ private void doCommitted(final long committedIndex) { if (dLedgerEntry != null) { this.lastAppliedTerm = dLedgerEntry.getTerm(); } - + // Take snapshot + snapshotManager.saveSnapshot(dLedgerEntry); // Check response timeout. if (iter.getCompleteAckNums() == 0) { if (this.entryPusher != null) { @@ -166,10 +205,90 @@ private void doCommitted(final long committedIndex) { } } - private void doSnapshotLoad(final CompletableFuture cb) { + private void doSnapshotLoad(LoadSnapshotHook loadSnapshotAfter) { + // Get snapshot meta + SnapshotReader reader = loadSnapshotAfter.getSnapshotReader(); + SnapshotMeta snapshotMeta; + try { + snapshotMeta = reader.load(); + } catch (IOException e) { + logger.error(e.getMessage()); + loadSnapshotAfter.doCallBack(SnapshotStatus.FAIL); + return; + } + if (snapshotMeta == null) { + logger.error("Unable to load state machine meta"); + loadSnapshotAfter.doCallBack(SnapshotStatus.FAIL); + return; + } + // Compare snapshot meta with the last applied index and term + long snapshotIndex = snapshotMeta.getLastIncludedIndex(); + long snapshotTerm = snapshotMeta.getLastIncludedTerm(); + if (lastAppliedCompareToSnapshot(snapshotIndex, snapshotTerm) > 0) { + logger.warn("The snapshot loading is expired"); + loadSnapshotAfter.doCallBack(SnapshotStatus.EXPIRED); + return; + } + // Load data from the state machine + try { + if (!this.statemachine.onSnapshotLoad(reader)) { + logger.error("Unable to load data from snapshot into state machine"); + loadSnapshotAfter.doCallBack(SnapshotStatus.FAIL); + return; + } + } catch (Exception e) { + e.printStackTrace(); + loadSnapshotAfter.doCallBack(SnapshotStatus.FAIL); + return; + } + // Update statemachine info + this.lastAppliedIndex.set(snapshotMeta.getLastIncludedIndex()); + this.lastAppliedTerm = snapshotMeta.getLastIncludedTerm(); + loadSnapshotAfter.registerSnapshotMeta(snapshotMeta); + loadSnapshotAfter.doCallBack(SnapshotStatus.SUCCESS); + } + + private int lastAppliedCompareToSnapshot(long snapshotIndex, long snapshotTerm) { + // 1. Compare term 2. Compare index + int res = Long.compare(this.lastAppliedTerm, snapshotTerm); + if (res == 0) { + return Long.compare(this.lastAppliedIndex.get(), snapshotIndex); + } else { + return res; + } } - private void doSnapshotSave(final CompletableFuture cb) { + private void doSnapshotSave(SaveSnapshotHook saveSnapshotAfter) { + // Build and save snapshot meta + DLedgerEntry curEntry = saveSnapshotAfter.getSnapshotEntry(); + saveSnapshotAfter.registerSnapshotMeta(new SnapshotMeta(curEntry.getIndex(), curEntry.getTerm())); + SnapshotWriter writer = saveSnapshotAfter.getSnapshotWriter(); + if (writer == null) { + return; + } + // Save data through the state machine + try { + if (!this.statemachine.onSnapshotSave(writer)) { + logger.error("Unable to save snapshot data from state machine"); + saveSnapshotAfter.doCallBack(SnapshotStatus.FAIL); + return; + } + } catch (Exception e) { + e.printStackTrace(); + saveSnapshotAfter.doCallBack(SnapshotStatus.FAIL); + return; + } + saveSnapshotAfter.doCallBack(SnapshotStatus.SUCCESS); + } + + public void setError(DLedgerServer server, final DLedgerException error) { + this.error = error; + if (this.statemachine != null) { + this.statemachine.onError(error); + } + if (server != null) { + server.shutdown(); + } } @Override @@ -180,4 +299,20 @@ public String getServiceName() { public Long getLastAppliedIndex() { return this.lastAppliedIndex.get(); } + + public long getLastAppliedTerm() { + return lastAppliedTerm; + } + + public void registerSnapshotManager(SnapshotManager snapshotManager) { + this.snapshotManager = snapshotManager; + } + + public SnapshotManager getSnapshotManager() { + return this.snapshotManager; + } + + public DLedgerStore getdLedgerStore() { + return dLedgerStore; + } } diff --git a/dledger/src/main/java/io/openmessaging/storage/dledger/store/DLedgerStore.java b/dledger/src/main/java/io/openmessaging/storage/dledger/store/DLedgerStore.java index 2bf9a2f0..6f1f981b 100644 --- a/dledger/src/main/java/io/openmessaging/storage/dledger/store/DLedgerStore.java +++ b/dledger/src/main/java/io/openmessaging/storage/dledger/store/DLedgerStore.java @@ -57,6 +57,10 @@ public long truncate(DLedgerEntry entry, long leaderTerm, String leaderId) { return -1; } + public void resetOffsetAfterSnapshot(DLedgerEntry entry) { + + } + public void startup() { } diff --git a/dledger/src/main/java/io/openmessaging/storage/dledger/store/file/DLedgerMmapFileStore.java b/dledger/src/main/java/io/openmessaging/storage/dledger/store/file/DLedgerMmapFileStore.java index 76a30077..ada127d8 100644 --- a/dledger/src/main/java/io/openmessaging/storage/dledger/store/file/DLedgerMmapFileStore.java +++ b/dledger/src/main/java/io/openmessaging/storage/dledger/store/file/DLedgerMmapFileStore.java @@ -339,7 +339,6 @@ private void reviseLedgerBeginIndex() { } finally { SelectMmapBufferResult.release(sbr); } - } @Override @@ -473,6 +472,15 @@ private long calculateWherePosition(final MmapFileList mappedFileList, long cont return mappedFileList.getFlushedWhere(); } + @Override + public void resetOffsetAfterSnapshot(DLedgerEntry entry) { + long resetPos = entry.getPos() + entry.getSize(); + dataFileList.resetOffset(resetPos); + long resetIndexOffset = entry.getIndex() * INDEX_UNIT_SIZE; + indexFileList.resetOffset(resetIndexOffset); + reviseLedgerBeginIndex(); + } + @Override public DLedgerEntry appendAsFollower(DLedgerEntry entry, long leaderTerm, String leaderId) { PreConditions.check(memberState.isFollower(), DLedgerResponseCode.NOT_FOLLOWER, "role=%s", memberState.getRole()); diff --git a/dledger/src/main/java/io/openmessaging/storage/dledger/utils/IOUtils.java b/dledger/src/main/java/io/openmessaging/storage/dledger/utils/IOUtils.java index 08b04cea..2a6e098b 100644 --- a/dledger/src/main/java/io/openmessaging/storage/dledger/utils/IOUtils.java +++ b/dledger/src/main/java/io/openmessaging/storage/dledger/utils/IOUtils.java @@ -31,7 +31,13 @@ import java.net.SocketException; import java.net.URL; import java.net.URLConnection; +import java.nio.channels.FileChannel; import java.nio.charset.Charset; +import java.nio.file.AtomicMoveNotSupportedException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.StandardCopyOption; +import java.nio.file.StandardOpenOption; import java.util.ArrayList; import java.util.Enumeration; import java.util.List; @@ -313,19 +319,77 @@ public static String humanReadableByteCount(long bytes, boolean si) { return String.format("%.1f %sB", bytes / Math.pow(unit, exp), pre); } - public static void deleteFile(File file) { + public static void deleteFile(File file) throws IOException { if (!file.exists()) { return; } if (file.isFile()) { - file.delete(); + if (!file.delete()) { + throw new IOException("Unable to delete file: " + file); + } } else if (file.isDirectory()) { File[] files = file.listFiles(); for (File file1 : files) { deleteFile(file1); } - file.delete(); + if (!file.delete()) { + throw new IOException("Unable to delete directory: " + file); + } + } + } + + public static void mkDir(File dir) throws IOException { + if (dir.exists() && !dir.isDirectory()) { + throw new IOException("Unable to create directory: File " + dir + " exists and is not a directory."); + } else if (!dir.mkdirs() && !dir.isDirectory()) { + throw new IOException("Unable to create directory " + dir); } } + public static void atomicMvFile(File srcFile, File destFile) throws IOException { + Path srcPath = srcFile.toPath(); + Path destPath = destFile.toPath(); + try { + // Atomic move + Files.move(srcPath, destPath, StandardCopyOption.ATOMIC_MOVE); + } catch (IOException mvFailed) { + if (mvFailed instanceof AtomicMoveNotSupportedException) { + logger.warn("Unable to support atomic move, back to non-atomic move, error: {}", mvFailed.getMessage()); + } else { + logger.warn("Unable to move files atomically, back to non-atomic move, error: {}", mvFailed.getMessage()); + } + if (destFile.exists()) { + logger.info("The file has already existed in the destination location {}", destPath); + } + try { + Files.move(srcPath, destPath, StandardCopyOption.REPLACE_EXISTING); + } catch (IOException replaceFailed) { + replaceFailed.addSuppressed(mvFailed); + logger.warn("Unable to move {} to {}. Try deleting {}", srcPath, destPath, srcPath); + try { + Files.deleteIfExists(srcPath); + } catch (IOException deleteFailed) { + deleteFailed.addSuppressed(replaceFailed); + logger.warn("Unable to delete {}", srcPath); + throw deleteFailed; + } + throw replaceFailed; + } + } + // Force sync + fsync(destFile.getParentFile()); + } + + public static void fsync(File file) throws IOException { + boolean isDir = file.isDirectory(); + // Unable to force sync on Windows + if (isDir && System.getProperty("os.name").toLowerCase().contains("win")) { + logger.warn("Unable to force sync directory {} on Windows", file); + return; + } + try (final FileChannel fc = FileChannel.open(file.toPath(), isDir ? StandardOpenOption.READ + : StandardOpenOption.WRITE)) { + fc.force(true); + } + } } diff --git a/dledger/src/test/java/io/openmessaging/storage/dledger/ServerTestHarness.java b/dledger/src/test/java/io/openmessaging/storage/dledger/ServerTestHarness.java index b6115350..2f951b55 100644 --- a/dledger/src/test/java/io/openmessaging/storage/dledger/ServerTestHarness.java +++ b/dledger/src/test/java/io/openmessaging/storage/dledger/ServerTestHarness.java @@ -17,6 +17,7 @@ package io.openmessaging.storage.dledger; import io.openmessaging.storage.dledger.client.DLedgerClient; +import io.openmessaging.storage.dledger.statemachine.MockStateMachine; import io.openmessaging.storage.dledger.util.FileTestUtil; import java.io.File; import java.util.List; @@ -72,6 +73,33 @@ protected synchronized DLedgerServer launchServer(String group, String peers, St return dLedgerServer; } + protected synchronized DLedgerServer launchServerWithStateMachine(String group, String peers, String selfId, String leaderId, + String storeType, int snapshotThreshold, int mappedFileSizeForEntryData) { + DLedgerConfig config = new DLedgerConfig(); + config.group(group).selfId(selfId).peers(peers); + config.setStoreBaseDir(FileTestUtil.TEST_BASE + File.separator + group); + config.setStoreType(storeType); + config.setSnapshotThreshold(snapshotThreshold); + config.setMappedFileSizeForEntryData(mappedFileSizeForEntryData); + config.setEnableLeaderElector(false); + config.setEnableDiskForceClean(false); + config.setDiskSpaceRatioToForceClean(0.90f); + DLedgerServer dLedgerServer = new DLedgerServer(config); + MemberState memberState = dLedgerServer.getMemberState(); + memberState.setCurrTermForTest(0); + if (selfId.equals(leaderId)) { + memberState.changeToLeader(0); + } else { + memberState.changeToFollower(0, leaderId); + } + bases.add(config.getDataStorePath()); + bases.add(config.getIndexStorePath()); + bases.add(config.getDefaultPath()); + dLedgerServer.registerStateMachine(new MockStateMachine()); + dLedgerServer.startup(); + return dLedgerServer; + } + protected synchronized DLedgerServer launchServerEnableBatchPush(String group, String peers, String selfId, String leaderId, String storeType) { DLedgerConfig config = new DLedgerConfig(); diff --git a/dledger/src/test/java/io/openmessaging/storage/dledger/snapshot/SnapshotManagerTest.java b/dledger/src/test/java/io/openmessaging/storage/dledger/snapshot/SnapshotManagerTest.java new file mode 100644 index 00000000..a71334b7 --- /dev/null +++ b/dledger/src/test/java/io/openmessaging/storage/dledger/snapshot/SnapshotManagerTest.java @@ -0,0 +1,135 @@ +package io.openmessaging.storage.dledger.snapshot; + +import com.alibaba.fastjson.JSON; +import io.openmessaging.storage.dledger.DLedgerConfig; +import io.openmessaging.storage.dledger.DLedgerServer; +import io.openmessaging.storage.dledger.ServerTestHarness; +import io.openmessaging.storage.dledger.client.DLedgerClient; +import io.openmessaging.storage.dledger.protocol.AppendEntryResponse; +import io.openmessaging.storage.dledger.protocol.DLedgerResponseCode; +import io.openmessaging.storage.dledger.statemachine.MockStateMachine; +import io.openmessaging.storage.dledger.statemachine.StateMachineCaller; +import io.openmessaging.storage.dledger.util.FileTestUtil; +import io.openmessaging.storage.dledger.utils.IOUtils; +import org.junit.jupiter.api.Test; + +import java.io.File; +import java.util.ArrayList; +import java.util.List; +import java.util.Objects; +import java.util.UUID; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +public class SnapshotManagerTest extends ServerTestHarness { + + + @Test + public void testSaveAndLoadSnapshot() throws InterruptedException { + // Launch server + String group = UUID.randomUUID().toString(); + String peers = String.format("n0-localhost:%d;n1-localhost:%d;n2-localhost:%d", nextPort(), nextPort(), nextPort()); + DLedgerServer dLedgerServer0 = launchServerWithStateMachine(group, peers, "n0", "n1", DLedgerConfig.FILE, 10, 1024); + DLedgerServer dLedgerServer1 = launchServerWithStateMachine(group, peers, "n1", "n1", DLedgerConfig.FILE, 10, 1024); + DLedgerServer dLedgerServer2 = launchServerWithStateMachine(group, peers, "n2", "n1", DLedgerConfig.FILE, 10, 1024); + final List serverList = new ArrayList() { + { + add(dLedgerServer0); + add(dLedgerServer1); + add(dLedgerServer2); + } + }; + // Launch client + DLedgerClient dLedgerClient = launchClient(group, peers.split(";")[0]); + for (int i = 0; i < 100; i++) { + AppendEntryResponse appendEntryResponse = dLedgerClient.append(new byte[512]); + assertEquals(DLedgerResponseCode.SUCCESS.getCode(), appendEntryResponse.getCode()); + assertEquals(i, appendEntryResponse.getIndex()); + } + Thread.sleep(1200); + for (DLedgerServer server : serverList) { + assertEquals(99, server.getdLedgerStore().getLedgerEndIndex()); + } + // Check state machine + for (DLedgerServer server : serverList) { + final MockStateMachine fsm = (MockStateMachine) server.getStateMachine(); + assertEquals(99, fsm.getAppliedIndex()); + assertEquals(100, fsm.getTotalEntries()); + } + Thread.sleep(100); + // Shutdown server + dLedgerServer0.shutdown(); + dLedgerServer1.shutdown(); + dLedgerServer2.shutdown(); + serverList.clear(); + // Restart server and apply snapshot + DLedgerServer newDLedgerServer0 = launchServerWithStateMachine(group, peers, "n0", "n0", DLedgerConfig.FILE, 10, 1024); + DLedgerServer newDLedgerServer1 = launchServerWithStateMachine(group, peers, "n1", "n0", DLedgerConfig.FILE, 10, 1024); + DLedgerServer newDLedgerServer2 = launchServerWithStateMachine(group, peers, "n2", "n0", DLedgerConfig.FILE, 10, 1024); + serverList.add(newDLedgerServer0); + serverList.add(newDLedgerServer1); + serverList.add(newDLedgerServer2); + Thread.sleep(1000); + // State machine could only be recovered from snapshot due to the entry has been removed after saving snapshot + for (DLedgerServer server : serverList) { + final MockStateMachine fsm = (MockStateMachine) server.getStateMachine(); + assertEquals(99, server.getFsmCaller().getLastAppliedIndex()); + assertEquals(100, fsm.getTotalEntries()); + } + } + + @Test + public void testSnapshotReservedNum() throws InterruptedException { + String group = UUID.randomUUID().toString(); + String selfId = "n0"; + String peers = String.format("%s-localhost:%d", selfId, nextPort()); + DLedgerServer server = launchServerWithStateMachine(group, peers, selfId, "n0", DLedgerConfig.FILE, 10, 1024); + + DLedgerClient dLedgerClient = launchClient(group, peers); + for (int i = 0; i < 120; i++) { + AppendEntryResponse appendEntryResponse = dLedgerClient.append(new byte[512]); + assertEquals(DLedgerResponseCode.SUCCESS.getCode(), appendEntryResponse.getCode()); + assertEquals(i, appendEntryResponse.getIndex()); + Thread.sleep(100); + } + // Check snapshot reserved number + int snapshotCnt = Objects.requireNonNull(new File(server.getDLedgerConfig().getSnapshotStoreBaseDir()).listFiles()).length; + int maxSnapshotReservedNum = server.getDLedgerConfig().getMaxSnapshotReservedNum(); + assertEquals(snapshotCnt, maxSnapshotReservedNum); + } + + @Test + public void testLoadErrorSnapshot() throws Exception { + String group = UUID.randomUUID().toString(); + String selfId = "n0"; + String peers = String.format("%s-localhost:%d", selfId, nextPort()); + String snapshotBaseDirPrefix = FileTestUtil.TEST_BASE + File.separator + group + File.separator + "dledger-" + + selfId + File.separator + "snapshot" + File.separator + SnapshotManager.SNAPSHOT_DIR_PREFIX; + + // Build error snapshot without state machine data + long errorSnapshotIdx1 = 10; + String errorSnapshotStoreBasePath1 = snapshotBaseDirPrefix + errorSnapshotIdx1; + IOUtils.string2File(JSON.toJSONString(new SnapshotMeta(errorSnapshotIdx1, 1)), + errorSnapshotStoreBasePath1 + File.separator + SnapshotManager.SNAPSHOT_META_FILE); + + // Build error snapshot without state machine meta + long errorSnapshotIdx2 = 9; + String errorSnapshotStoreBasePath2 = snapshotBaseDirPrefix + errorSnapshotIdx2; + IOUtils.string2File("100", errorSnapshotStoreBasePath2 + File.separator + SnapshotManager.SNAPSHOT_DATA_FILE); + + long snapshotIdx = 8; + String snapshotStoreBasePath = snapshotBaseDirPrefix + snapshotIdx; + SnapshotMeta snapshotMeta = new SnapshotMeta(snapshotIdx, 1); + IOUtils.string2File(JSON.toJSONString(snapshotMeta), snapshotStoreBasePath + File.separator + SnapshotManager.SNAPSHOT_META_FILE); + IOUtils.string2File("80", snapshotStoreBasePath + File.separator + SnapshotManager.SNAPSHOT_DATA_FILE); + + DLedgerServer server = launchServerWithStateMachine(group, peers, "n0", "n0", DLedgerConfig.FILE, 10, 10 * 1024 * 1024); + Thread.sleep(1000); + + StateMachineCaller caller = server.getFsmCaller(); + MockStateMachine fsm = (MockStateMachine) caller.getStateMachine(); + assertEquals(caller.getLastAppliedIndex(), 8); + assertEquals(fsm.getTotalEntries(), 80); + caller.shutdown(); + } +} diff --git a/dledger/src/test/java/io/openmessaging/storage/dledger/snapshot/SnapshotReaderTest.java b/dledger/src/test/java/io/openmessaging/storage/dledger/snapshot/SnapshotReaderTest.java new file mode 100644 index 00000000..f538351b --- /dev/null +++ b/dledger/src/test/java/io/openmessaging/storage/dledger/snapshot/SnapshotReaderTest.java @@ -0,0 +1,31 @@ +package io.openmessaging.storage.dledger.snapshot; + +import com.alibaba.fastjson.JSON; +import io.openmessaging.storage.dledger.snapshot.file.FileSnapshotReader; +import io.openmessaging.storage.dledger.util.FileTestUtil; +import io.openmessaging.storage.dledger.utils.IOUtils; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +import java.io.File; +import java.io.IOException; +import java.util.UUID; + +public class SnapshotReaderTest { + + @Test + public void testReaderLoad() throws IOException { + String metaFilePath = FileTestUtil.TEST_BASE + File.separator + SnapshotManager.SNAPSHOT_META_FILE; + try { + SnapshotMeta snapshotMeta = new SnapshotMeta(10, 0); + IOUtils.string2File(JSON.toJSONString(snapshotMeta), metaFilePath); + + SnapshotReader reader = new FileSnapshotReader(FileTestUtil.TEST_BASE); + Assertions.assertNull(reader.getSnapshotMeta()); + Assertions.assertEquals(reader.getSnapshotStorePath(), FileTestUtil.TEST_BASE); + Assertions.assertEquals(reader.load().toString(), snapshotMeta.toString()); + } finally { + IOUtils.deleteFile(new File(metaFilePath)); + } + } +} diff --git a/dledger/src/test/java/io/openmessaging/storage/dledger/snapshot/SnapshotStoreTest.java b/dledger/src/test/java/io/openmessaging/storage/dledger/snapshot/SnapshotStoreTest.java new file mode 100644 index 00000000..b3208416 --- /dev/null +++ b/dledger/src/test/java/io/openmessaging/storage/dledger/snapshot/SnapshotStoreTest.java @@ -0,0 +1,35 @@ +package io.openmessaging.storage.dledger.snapshot; + +import com.alibaba.fastjson.JSON; +import io.openmessaging.storage.dledger.snapshot.file.FileSnapshotStore; +import io.openmessaging.storage.dledger.util.FileTestUtil; +import io.openmessaging.storage.dledger.utils.IOUtils; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +import java.io.File; +import java.io.IOException; + +public class SnapshotStoreTest { + + @Test + public void testCreateReaderAndWriter() throws IOException { + final long lastSnapshotIndex = 10; + try { + FileSnapshotStore writerStore = new FileSnapshotStore(FileTestUtil.TEST_BASE); + SnapshotWriter writer = writerStore.createSnapshotWriter(); + Assertions.assertNotNull(writer); + SnapshotMeta writerMeta = new SnapshotMeta(lastSnapshotIndex, 0); + writer.setSnapshotMeta(writerMeta); + writer.save(SnapshotStatus.SUCCESS); + + FileSnapshotStore readerStore = new FileSnapshotStore(FileTestUtil.TEST_BASE); + SnapshotReader reader = readerStore.createSnapshotReader(); + Assertions.assertNotNull(reader); + SnapshotMeta readerMeta = reader.load(); + Assertions.assertEquals(writerMeta.toString(), readerMeta.toString()); + } finally { + IOUtils.deleteFile(new File(FileTestUtil.TEST_BASE + File.separator + SnapshotManager.SNAPSHOT_DIR_PREFIX + lastSnapshotIndex)); + } + } +} diff --git a/dledger/src/test/java/io/openmessaging/storage/dledger/snapshot/SnapshotWriterTest.java b/dledger/src/test/java/io/openmessaging/storage/dledger/snapshot/SnapshotWriterTest.java new file mode 100644 index 00000000..21df4363 --- /dev/null +++ b/dledger/src/test/java/io/openmessaging/storage/dledger/snapshot/SnapshotWriterTest.java @@ -0,0 +1,33 @@ +package io.openmessaging.storage.dledger.snapshot; + +import com.alibaba.fastjson.JSON; +import io.openmessaging.storage.dledger.snapshot.file.FileSnapshotStore; +import io.openmessaging.storage.dledger.snapshot.file.FileSnapshotWriter; +import io.openmessaging.storage.dledger.util.FileTestUtil; +import io.openmessaging.storage.dledger.utils.IOUtils; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +import java.io.File; +import java.io.IOException; + +public class SnapshotWriterTest { + + @Test + public void testWriterSave() throws IOException { + final long lastSnapshotIndex = 10; + SnapshotWriter writer = new FileSnapshotWriter(FileTestUtil.TEST_BASE + File.separator + "tmp", new FileSnapshotStore(FileTestUtil.TEST_BASE)); + + SnapshotMeta snapshotMeta = new SnapshotMeta(lastSnapshotIndex, 0); + writer.setSnapshotMeta(snapshotMeta); + writer.save(SnapshotStatus.SUCCESS); + + String testDir = FileTestUtil.TEST_BASE + File.separator + SnapshotManager.SNAPSHOT_DIR_PREFIX + lastSnapshotIndex; + try { + Assertions.assertEquals(IOUtils.file2String(testDir + File.separator + + SnapshotManager.SNAPSHOT_META_FILE), JSON.toJSONString(snapshotMeta)); + } finally { + IOUtils.deleteFile(new File(testDir)); + } + } +} diff --git a/dledger/src/test/java/io/openmessaging/storage/dledger/statemachine/MockSnapshotFile.java b/dledger/src/test/java/io/openmessaging/storage/dledger/statemachine/MockSnapshotFile.java new file mode 100644 index 00000000..fbf3c894 --- /dev/null +++ b/dledger/src/test/java/io/openmessaging/storage/dledger/statemachine/MockSnapshotFile.java @@ -0,0 +1,38 @@ +package io.openmessaging.storage.dledger.statemachine; + +import io.openmessaging.storage.dledger.utils.IOUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.IOException; + +public class MockSnapshotFile { + + private static Logger logger = LoggerFactory.getLogger(MockSnapshotFile.class); + + private final String snapshotStorePath; + + public MockSnapshotFile(String snapshotStorePath) { + this.snapshotStorePath = snapshotStorePath; + } + + public boolean save(final long value) { + try { + IOUtils.string2File(String.valueOf(value), snapshotStorePath); + return true; + } catch (IOException e) { + logger.error("Unable to save snapshot data", e); + return false; + } + } + + public long load() throws IOException { + String str = IOUtils.file2String(new File(snapshotStorePath)); + if (str != null && str.length() != 0) { + return Long.parseLong(str); + } else { + throw new IOException("Unable to load snapshot data from " + snapshotStorePath); + } + } +} \ No newline at end of file diff --git a/dledger/src/test/java/io/openmessaging/storage/dledger/statemachine/MockStateMachine.java b/dledger/src/test/java/io/openmessaging/storage/dledger/statemachine/MockStateMachine.java index b3ec7bc0..dcddd9de 100644 --- a/dledger/src/test/java/io/openmessaging/storage/dledger/statemachine/MockStateMachine.java +++ b/dledger/src/test/java/io/openmessaging/storage/dledger/statemachine/MockStateMachine.java @@ -16,16 +16,22 @@ package io.openmessaging.storage.dledger.statemachine; -import java.util.concurrent.CompletableFuture; - import io.openmessaging.storage.dledger.entry.DLedgerEntry; -import io.openmessaging.storage.dledger.snapshot.SnapshotReader; -import io.openmessaging.storage.dledger.snapshot.SnapshotWriter; +import io.openmessaging.storage.dledger.exception.DLedgerException; +import io.openmessaging.storage.dledger.snapshot.*; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.IOException; +import java.util.concurrent.atomic.AtomicLong; public class MockStateMachine implements StateMachine { - private volatile long appliedIndex = -1; - private volatile long totalEntries; + private static Logger logger = LoggerFactory.getLogger(MockStateMachine.class); + private volatile long appliedIndex = -1L; + private final AtomicLong totalEntries = new AtomicLong(0); + private final AtomicLong lastAppliedIndex = new AtomicLong(-1); @Override public void onApply(final CommittedEntryIterator iter) { @@ -35,19 +41,31 @@ public void onApply(final CommittedEntryIterator iter) { if (next.getIndex() <= this.appliedIndex) { continue; } + this.totalEntries.addAndGet(1); this.appliedIndex = next.getIndex(); - this.totalEntries += 1; } } } @Override - public void onSnapshotSave(final SnapshotWriter writer, final CompletableFuture done) { + public boolean onSnapshotSave(final SnapshotWriter writer) { + long curEntryCnt = this.totalEntries.get(); + MockSnapshotFile snapshotFile = new MockSnapshotFile(writer.getSnapshotStorePath() + File.separator + SnapshotManager.SNAPSHOT_DATA_FILE); + return snapshotFile.save(curEntryCnt); } @Override public boolean onSnapshotLoad(final SnapshotReader reader) { - return false; + // Apply snapshot data + MockSnapshotFile snapshotFile = new MockSnapshotFile(reader.getSnapshotStorePath() + + File.separator + SnapshotManager.SNAPSHOT_DATA_FILE); + try { + this.totalEntries.set(snapshotFile.load()); + return true; + } catch (IOException e) { + e.printStackTrace(); + return false; + } } @Override @@ -55,6 +73,11 @@ public void onShutdown() { } + @Override + public void onError(DLedgerException error) { + logger.error("DLedger Error: {}", error.getMessage(), error); + } + @Override public String getBindDLedgerId() { return null; @@ -65,6 +88,6 @@ public long getAppliedIndex() { } public long getTotalEntries() { - return totalEntries; + return this.totalEntries.get(); } } diff --git a/dledger/src/test/java/io/openmessaging/storage/dledger/statemachine/StateMachineCallerTest.java b/dledger/src/test/java/io/openmessaging/storage/dledger/statemachine/StateMachineCallerTest.java index ece214aa..ff4d0fe9 100644 --- a/dledger/src/test/java/io/openmessaging/storage/dledger/statemachine/StateMachineCallerTest.java +++ b/dledger/src/test/java/io/openmessaging/storage/dledger/statemachine/StateMachineCallerTest.java @@ -16,65 +16,169 @@ package io.openmessaging.storage.dledger.statemachine; +import java.io.File; import java.util.ArrayList; import java.util.List; import java.util.UUID; +import java.util.concurrent.CountDownLatch; +import com.alibaba.fastjson.JSON; import io.openmessaging.storage.dledger.DLedgerConfig; import io.openmessaging.storage.dledger.DLedgerServer; import io.openmessaging.storage.dledger.MemberState; import io.openmessaging.storage.dledger.ServerTestHarness; +import io.openmessaging.storage.dledger.snapshot.SnapshotManager; +import io.openmessaging.storage.dledger.snapshot.SnapshotMeta; +import io.openmessaging.storage.dledger.snapshot.SnapshotReader; +import io.openmessaging.storage.dledger.snapshot.SnapshotStatus; +import io.openmessaging.storage.dledger.snapshot.file.FileSnapshotReader; +import io.openmessaging.storage.dledger.snapshot.hook.LoadSnapshotHook; +import io.openmessaging.storage.dledger.store.file.DLedgerMmapFileStore; +import io.openmessaging.storage.dledger.util.FileTestUtil; +import io.openmessaging.storage.dledger.utils.IOUtils; import org.junit.jupiter.api.Test; import io.openmessaging.storage.dledger.client.DLedgerClient; import io.openmessaging.storage.dledger.protocol.AppendEntryResponse; import io.openmessaging.storage.dledger.protocol.DLedgerResponseCode; import io.openmessaging.storage.dledger.entry.DLedgerEntry; -import io.openmessaging.storage.dledger.store.DLedgerMemoryStore; import io.openmessaging.storage.dledger.utils.Pair; import static org.junit.jupiter.api.Assertions.assertEquals; - class StateMachineCallerTest extends ServerTestHarness { - public Pair mockCaller() { - DLedgerConfig config = new DLedgerConfig(); - MemberState memberState = new MemberState(config); - memberState.changeToLeader(0); - final DLedgerMemoryStore dLedgerMemoryStore = new DLedgerMemoryStore(config, memberState); - for (int i = 0; i < 10; i++) { - final DLedgerEntry entry = new DLedgerEntry(); - entry.setIndex(i); - entry.setTerm(0); - dLedgerMemoryStore.appendAsLeader(entry); - } - final MockStateMachine fsm = new MockStateMachine(); - final StateMachineCaller caller = new StateMachineCaller(dLedgerMemoryStore, fsm, null); - caller.start(); - return new Pair<>(caller, fsm); - } + private DLedgerConfig config; @Test - public void testOnCommitted() throws Exception { - final Pair result = mockCaller(); + public void testOnCommittedAndOnSnapshotSave() throws Exception { + String group = UUID.randomUUID().toString(); + String selfId = "n0"; + String leaderId = "n0"; + String peers = String.format("%s-localhost:%d", selfId, nextPort()); + + final DLedgerServer dLedgerServer = createDLedgerServer(group, peers, selfId, leaderId); + final Pair result = mockCaller(dLedgerServer); + updateFileStore((DLedgerMmapFileStore) dLedgerServer.getDLedgerStore(), 10); final StateMachineCaller caller = result.getKey(); final MockStateMachine fsm = result.getValue(); + caller.onCommitted(9); Thread.sleep(1000); assertEquals(fsm.getAppliedIndex(), 9); assertEquals(fsm.getTotalEntries(), 10); + // Check onSnapshotSave result + String snapshotMetaJSON = IOUtils.file2String(this.config.getSnapshotStoreBaseDir() + File.separator + + SnapshotManager.SNAPSHOT_DIR_PREFIX + fsm.getAppliedIndex() + File.separator + + SnapshotManager.SNAPSHOT_META_FILE); + SnapshotMeta snapshotMetaFromJSON = JSON.parseObject(snapshotMetaJSON, SnapshotMeta.class); + assertEquals(snapshotMetaFromJSON.getLastIncludedIndex(), 9); + assertEquals(snapshotMetaFromJSON.getLastIncludedTerm(), 0); + String snapshotData = IOUtils.file2String(this.config.getSnapshotStoreBaseDir() + File.separator + + SnapshotManager.SNAPSHOT_DIR_PREFIX + fsm.getAppliedIndex() + File.separator + SnapshotManager.SNAPSHOT_DATA_FILE); + assertEquals(Long.parseLong(snapshotData), 10); + caller.shutdown(); + } + + @Test + public void testOnSnapshotLoad() throws Exception { + String group = UUID.randomUUID().toString(); + String selfId = "n0"; + String leaderId = "n0"; + String peers = String.format("%s-localhost:%d", selfId, nextPort()); + + final DLedgerServer dLedgerServer = createDLedgerServer(group, peers, selfId, leaderId); + final Pair result = mockCaller(dLedgerServer); + final StateMachineCaller caller = result.getKey(); + final MockStateMachine fsm = result.getValue(); + + final long lastIncludedIndex = 10; + String snapshotStoreBasePath = this.config.getSnapshotStoreBaseDir() + File.separator + SnapshotManager.SNAPSHOT_DIR_PREFIX + lastIncludedIndex; + SnapshotMeta snapshotMeta = new SnapshotMeta(lastIncludedIndex, 1); + IOUtils.string2File(JSON.toJSONString(snapshotMeta), snapshotStoreBasePath + File.separator + SnapshotManager.SNAPSHOT_META_FILE); + IOUtils.string2File("90", snapshotStoreBasePath + File.separator + SnapshotManager.SNAPSHOT_DATA_FILE); + + SnapshotReader reader = new FileSnapshotReader(snapshotStoreBasePath); + final CountDownLatch latch = new CountDownLatch(1); + caller.onSnapshotLoad(new LoadSnapshotHook() { + @Override + public SnapshotReader getSnapshotReader() { + return reader; + } + + @Override + public void registerSnapshotMeta(SnapshotMeta snapshotMeta) { + + } + + @Override + public void doCallBack(SnapshotStatus status) { + assertEquals(status.getCode(), SnapshotStatus.SUCCESS.getCode()); + latch.countDown(); + } + }); + latch.await(); + assertEquals(caller.getLastAppliedIndex(), 10); + assertEquals(fsm.getTotalEntries(), 90); caller.shutdown(); } + private DLedgerServer createDLedgerServer(String group, String peers, String selfId, String leaderId) { + this.config = new DLedgerConfig(); + this.config.group(group).selfId(selfId).peers(peers); + this.config.setStoreBaseDir(FileTestUtil.TEST_BASE + File.separator + group); + this.config.setSnapshotThreshold(0); + this.config.setStoreType(DLedgerConfig.FILE); + this.config.setMappedFileSizeForEntryData(10 * 1024 * 1024); + this.config.setEnableLeaderElector(false); + this.config.setEnableDiskForceClean(false); + this.config.setDiskSpaceRatioToForceClean(0.90f); + DLedgerServer dLedgerServer = new DLedgerServer(this.config); + MemberState memberState = dLedgerServer.getMemberState(); + memberState.setCurrTermForTest(0); + if (selfId.equals(leaderId)) { + memberState.changeToLeader(0); + } else { + memberState.changeToFollower(0, leaderId); + } + bases.add(this.config.getDataStorePath()); + bases.add(this.config.getIndexStorePath()); + bases.add(this.config.getDefaultPath()); + return dLedgerServer; + } + + public Pair mockCaller(DLedgerServer server) { + MockStateMachine fsm = new MockStateMachine(); + server.registerStateMachine(fsm); + StateMachineCaller caller = server.getFsmCaller(); + caller.start(); + server.getDLedgerStore().startup(); + return new Pair<>(caller, (MockStateMachine) caller.getStateMachine()); + } + + private void updateFileStore(DLedgerMmapFileStore fileStore, int entryNum) { + MemberState memberState = fileStore.getMemberState(); + memberState.changeToLeader(0); + for (int i = 0; i < entryNum; i++) { + DLedgerEntry entry = new DLedgerEntry(); + entry.setBody((new byte[1024])); + DLedgerEntry resEntry = fileStore.appendAsLeader(entry); + assertEquals(i, resEntry.getIndex()); + } + fileStore.updateCommittedIndex(memberState.currTerm(), entryNum - 1); + while (fileStore.getFlushPos() != fileStore.getWritePos()) { + fileStore.flush(); + } + } + @Test - public void testOnCommittedWithServer() throws InterruptedException { + public void testServerWithStateMachine() throws InterruptedException { String group = UUID.randomUUID().toString(); String peers = String.format("n0-localhost:%d;n1-localhost:%d;n2-localhost:%d", nextPort(), nextPort(), nextPort()); - DLedgerServer dLedgerServer0 = launchServer(group, peers, "n0", "n1", DLedgerConfig.MEMORY); - DLedgerServer dLedgerServer1 = launchServer(group, peers, "n1", "n1", DLedgerConfig.MEMORY); - DLedgerServer dLedgerServer2 = launchServer(group, peers, "n2", "n1", DLedgerConfig.MEMORY); + DLedgerServer dLedgerServer0 = launchServerWithStateMachine(group, peers, "n0", "n1", DLedgerConfig.FILE, 0, 10 * 1024 * 1024); + DLedgerServer dLedgerServer1 = launchServerWithStateMachine(group, peers, "n1", "n1", DLedgerConfig.FILE, 0, 10 * 1024 * 1024); + DLedgerServer dLedgerServer2 = launchServerWithStateMachine(group, peers, "n2", "n1", DLedgerConfig.FILE, 0, 10 * 1024 * 1024); final List serverList = new ArrayList() { { add(dLedgerServer0); @@ -82,11 +186,6 @@ public void testOnCommittedWithServer() throws InterruptedException { add(dLedgerServer2); } }; - // Register state machine - for (DLedgerServer server : serverList) { - final MockStateMachine fsm = new MockStateMachine(); - server.registerStateMachine(fsm); - } DLedgerClient dLedgerClient = launchClient(group, peers.split(";")[0]); for (int i = 0; i < 10; i++) { @@ -94,12 +193,11 @@ public void testOnCommittedWithServer() throws InterruptedException { assertEquals(DLedgerResponseCode.SUCCESS.getCode(), appendEntryResponse.getCode()); assertEquals(i, appendEntryResponse.getIndex()); } - Thread.sleep(1000); + Thread.sleep(1200); for (DLedgerServer server : serverList) { assertEquals(9, server.getdLedgerStore().getLedgerEndIndex()); } - - // Check statemachine + // Check state machine for (DLedgerServer server : serverList) { final MockStateMachine fsm = (MockStateMachine) server.getStateMachine(); assertEquals(9, fsm.getAppliedIndex()); diff --git a/dledger/src/test/java/io/openmessaging/storage/dledger/util/FileTestUtil.java b/dledger/src/test/java/io/openmessaging/storage/dledger/util/FileTestUtil.java index 116cfce2..421123c4 100644 --- a/dledger/src/test/java/io/openmessaging/storage/dledger/util/FileTestUtil.java +++ b/dledger/src/test/java/io/openmessaging/storage/dledger/util/FileTestUtil.java @@ -18,6 +18,7 @@ import io.openmessaging.storage.dledger.utils.IOUtils; import java.io.File; +import java.io.IOException; import java.util.UUID; public class FileTestUtil { @@ -40,11 +41,11 @@ public static String createTestDir(String prefix) { return baseDir; } - public static void deleteFile(String fileName) { + public static void deleteFile(String fileName) throws IOException { IOUtils.deleteFile(new File(fileName)); } - public static void deleteFile(File file) { + public static void deleteFile(File file) throws IOException { IOUtils.deleteFile(file); } diff --git a/pom.xml b/pom.xml index a040ab7a..0006077e 100644 --- a/pom.xml +++ b/pom.xml @@ -304,4 +304,4 @@ - + \ No newline at end of file diff --git a/proxy/src/test/java/io/openmessaging/storage/dledger/proxy/util/FileTestUtil.java b/proxy/src/test/java/io/openmessaging/storage/dledger/proxy/util/FileTestUtil.java index 5399b433..4822af76 100644 --- a/proxy/src/test/java/io/openmessaging/storage/dledger/proxy/util/FileTestUtil.java +++ b/proxy/src/test/java/io/openmessaging/storage/dledger/proxy/util/FileTestUtil.java @@ -18,6 +18,7 @@ import io.openmessaging.storage.dledger.utils.IOUtils; import java.io.File; +import java.io.IOException; import java.util.UUID; public class FileTestUtil { @@ -40,11 +41,11 @@ public static String createTestDir(String prefix) { return baseDir; } - public static void deleteFile(String fileName) { + public static void deleteFile(String fileName) throws IOException { IOUtils.deleteFile(new File(fileName)); } - public static void deleteFile(File file) { + public static void deleteFile(File file) throws IOException { IOUtils.deleteFile(file); }