diff --git a/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/store/BackendStore.java b/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/store/BackendStore.java index 7c55da1a3f..bd1727790b 100644 --- a/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/store/BackendStore.java +++ b/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/store/BackendStore.java @@ -20,7 +20,7 @@ package com.baidu.hugegraph.backend.store; import java.util.Iterator; -import java.util.Set; +import java.util.Map; import com.baidu.hugegraph.backend.id.Id; import com.baidu.hugegraph.backend.id.IdGenerator; @@ -117,7 +117,7 @@ public default void setCounterLowest(HugeType type, long lowest) { // Get current counter for a specific type public long getCounter(HugeType type); - public default Set createSnapshot(String snapshotDir) { + public default Map createSnapshot(String snapshotDir) { throw new UnsupportedOperationException("createSnapshot"); } diff --git a/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/store/raft/StoreSnapshotFile.java b/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/store/raft/StoreSnapshotFile.java index 30ac92a7b1..54a8b9c32b 100644 --- a/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/store/raft/StoreSnapshotFile.java +++ b/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/store/raft/StoreSnapshotFile.java @@ -22,11 +22,15 @@ import java.io.File; import java.io.IOException; import java.nio.file.Paths; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; import java.util.Set; import java.util.concurrent.ExecutorService; import java.util.zip.Checksum; import org.apache.commons.io.FileUtils; +import org.apache.commons.lang3.StringUtils; import org.slf4j.Logger; import com.alipay.sofa.jraft.Closure; @@ -36,10 +40,12 @@ import com.alipay.sofa.jraft.storage.snapshot.SnapshotReader; import com.alipay.sofa.jraft.storage.snapshot.SnapshotWriter; import com.alipay.sofa.jraft.util.CRC64; +import com.baidu.hugegraph.testutil.Whitebox; import com.baidu.hugegraph.util.CompressUtil; import com.baidu.hugegraph.util.E; import com.baidu.hugegraph.util.InsertionOrderUtil; import com.baidu.hugegraph.util.Log; +import com.google.protobuf.ByteString; public class StoreSnapshotFile { @@ -47,25 +53,42 @@ public class StoreSnapshotFile { public static final String SNAPSHOT_DIR = "snapshot"; private static final String TAR = ".tar"; - private static final String SNAPSHOT_TAR = SNAPSHOT_DIR + TAR; - private static final String MANIFEST = "manifest"; private final RaftBackendStore[] stores; + private final Map dataDisks; public StoreSnapshotFile(RaftBackendStore[] stores) { this.stores = stores; + this.dataDisks = new HashMap<>(); + for (RaftBackendStore raftStore : stores) { + // Call RocksDBStore method reportDiskMapping() + this.dataDisks.putAll(Whitebox.invoke(raftStore, "store", + "reportDiskMapping")); + } + /* + * Like that: + * general=/parent_path/rocksdb-data + * g/VERTEX=/parent_path/rocksdb-vertex + */ + LOG.debug("The store data disks mapping {}", this.dataDisks); } public void save(SnapshotWriter writer, Closure done, ExecutorService executor) { try { // Write snapshot to real directory - Set snapshotDirs = this.doSnapshotSave(); + Map snapshotDirMaps = this.doSnapshotSave(); executor.execute(() -> { - String jraftSnapshotPath = this.writeManifest(writer, - snapshotDirs, - done); - this.compressJraftSnapshotDir(writer, jraftSnapshotPath, done); + try { + this.compressSnapshotDir(writer, snapshotDirMaps); + this.deleteSnapshotDirs(snapshotDirMaps.keySet()); + done.run(Status.OK()); + } catch (Throwable e) { + LOG.error("Failed to compress snapshot", e); + done.run(new Status(RaftError.EIO, + "Failed to compress snapshot, " + + "error is %s", e.getMessage())); + } }); } catch (Throwable e) { LOG.error("Failed to save snapshot", e); @@ -76,43 +99,38 @@ public void save(SnapshotWriter writer, Closure done, } public boolean load(SnapshotReader reader) { - LocalFileMeta meta = (LocalFileMeta) reader.getFileMeta(SNAPSHOT_TAR); - String readerPath = reader.getPath(); - if (meta == null) { - LOG.error("Can't find snapshot archive file, path={}", readerPath); - return false; + Set snapshotDirTars = reader.listFiles(); + LOG.info("The snapshot tar files to be loaded are {}", snapshotDirTars); + Set snapshotDirs = new HashSet<>(); + for (String snapshotDirTar : snapshotDirTars) { + try { + String snapshotDir = this.decompressSnapshot(reader, + snapshotDirTar); + snapshotDirs.add(snapshotDir); + } catch (Throwable e) { + LOG.error("Failed to decompress snapshot tar", e); + return false; + } } - String jraftSnapshotPath = Paths.get(readerPath, SNAPSHOT_DIR) - .toString(); + try { - // Decompress manifest and data directory - this.decompressSnapshot(readerPath, meta); this.doSnapshotLoad(); - File tmp = new File(jraftSnapshotPath); - // Delete the decompressed temporary file. If the deletion fails - // (although it is a small probability event), it may affect the - // next snapshot decompression result. Therefore, the safest way - // is to terminate the state machine immediately. Users can choose - // to manually delete and restart according to the log information. - if (tmp.exists()) { - FileUtils.forceDelete(tmp); - } - return true; + this.deleteSnapshotDirs(snapshotDirs); } catch (Throwable e) { LOG.error("Failed to load snapshot", e); return false; } + return true; } - private Set doSnapshotSave() { - Set snapshotDirs = InsertionOrderUtil.newSet(); + private Map doSnapshotSave() { + Map snapshotDirMaps = InsertionOrderUtil.newMap(); for (RaftBackendStore store : this.stores) { - Set snapshots = store.originStore() - .createSnapshot(SNAPSHOT_DIR); - snapshotDirs.addAll(snapshots); + snapshotDirMaps.putAll(store.originStore() + .createSnapshot(SNAPSHOT_DIR)); } - LOG.info("Saved all snapshots: {}", snapshotDirs); - return snapshotDirs; + LOG.info("Saved all snapshots: {}", snapshotDirMaps); + return snapshotDirMaps; } private void doSnapshotLoad() { @@ -121,55 +139,63 @@ private void doSnapshotLoad() { } } - private String writeManifest(SnapshotWriter writer, - Set snapshotFiles, - Closure done) { + private void compressSnapshotDir(SnapshotWriter writer, + Map snapshotDirMaps) { String writerPath = writer.getPath(); - // Write all backend compressed snapshot file path to manifest - String jraftSnapshotPath = Paths.get(writerPath, SNAPSHOT_DIR) - .toString(); - File snapshotManifestFile = new File(jraftSnapshotPath, MANIFEST); - try { - FileUtils.writeLines(snapshotManifestFile, snapshotFiles); - } catch (IOException e) { - done.run(new Status(RaftError.EIO, - "Failed to write backend snapshot file path " + - "to manifest")); - } - return jraftSnapshotPath; - } + for (Map.Entry entry : snapshotDirMaps.entrySet()) { + String snapshotDir = entry.getKey(); + String diskTableKey = entry.getValue(); + String snapshotDirTar = Paths.get(snapshotDir).getFileName() + .toString() + TAR; + String outputFile = Paths.get(writerPath, snapshotDirTar) + .toString(); + Checksum checksum = new CRC64(); + try { + CompressUtil.compressTar(snapshotDir, outputFile, checksum); + } catch (Throwable e) { + throw new RaftException( + "Failed to compress snapshot, path=%s, files=%s", + e, writerPath, snapshotDirMaps.keySet()); + } - private void compressJraftSnapshotDir(SnapshotWriter writer, - String jraftSnapshotPath, - Closure done) { - String writerPath = writer.getPath(); - String outputFile = Paths.get(writerPath, SNAPSHOT_TAR).toString(); - try { LocalFileMeta.Builder metaBuilder = LocalFileMeta.newBuilder(); - Checksum checksum = new CRC64(); - CompressUtil.compressTar(jraftSnapshotPath, outputFile, checksum); metaBuilder.setChecksum(Long.toHexString(checksum.getValue())); - if (writer.addFile(SNAPSHOT_TAR, metaBuilder.build())) { - done.run(Status.OK()); - } else { - done.run(new Status(RaftError.EIO, - "Failed to add snapshot file: '%s'", - writerPath)); + /* + * snapshot_rocksdb-data.tar -> general + * snapshot_rocksdb-vertex.tar -> g/VERTEX + */ + metaBuilder.setUserMeta(ByteString.copyFromUtf8(diskTableKey)); + if (!writer.addFile(snapshotDirTar, metaBuilder.build())) { + throw new RaftException("Failed to add snapshot file: '%s'", + snapshotDirTar); } - } catch (Throwable e) { - LOG.error("Failed to compress snapshot, path={}, files={}, {}.", - writerPath, writer.listFiles(), e); - done.run(new Status(RaftError.EIO, - "Failed to compress snapshot '%s' due to: %s", - writerPath, e.getMessage())); } } - private void decompressSnapshot(String readerPath, LocalFileMeta meta) - throws IOException { - String archiveFile = Paths.get(readerPath, SNAPSHOT_TAR).toString(); + private String decompressSnapshot(SnapshotReader reader, + String snapshotDirTar) + throws IOException { + LocalFileMeta meta = (LocalFileMeta) reader.getFileMeta(snapshotDirTar); + if (meta == null) { + throw new IOException("Can't find snapshot archive file, path=" + + snapshotDirTar); + } + + String diskTableKey = meta.getUserMeta().toStringUtf8(); + E.checkArgument(this.dataDisks.containsKey(diskTableKey), + "The data path for '%s' should be exist", diskTableKey); + String dataPath = this.dataDisks.get(diskTableKey); + String parentPath = Paths.get(dataPath).getParent().toString(); + String snapshotDir = Paths.get(parentPath, + StringUtils.removeEnd(snapshotDirTar, TAR)) + .toString(); + FileUtils.deleteDirectory(new File(snapshotDir)); + LOG.info("Delete stale snapshot dir {}", snapshotDir); + Checksum checksum = new CRC64(); - CompressUtil.decompressTar(archiveFile, readerPath, checksum); + String archiveFile = Paths.get(reader.getPath(), snapshotDirTar) + .toString(); + CompressUtil.decompressTar(archiveFile, parentPath, checksum); if (meta.hasChecksum()) { String expected = meta.getChecksum(); String actual = Long.toHexString(checksum.getValue()); @@ -177,5 +203,12 @@ private void decompressSnapshot(String readerPath, LocalFileMeta meta) "Snapshot checksum error: '%s' != '%s'", actual, expected); } + return snapshotDir; + } + + private void deleteSnapshotDirs(Set snapshotDirs) { + for (String snapshotDir : snapshotDirs) { + FileUtils.deleteQuietly(new File(snapshotDir)); + } } } diff --git a/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/store/raft/StoreStateMachine.java b/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/store/raft/StoreStateMachine.java index 8d61e775a2..3506b6ca18 100644 --- a/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/store/raft/StoreStateMachine.java +++ b/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/store/raft/StoreStateMachine.java @@ -48,6 +48,7 @@ import com.baidu.hugegraph.backend.store.raft.rpc.RaftRequests.StoreType; import com.baidu.hugegraph.type.HugeType; import com.baidu.hugegraph.type.define.GraphMode; +import com.baidu.hugegraph.util.E; import com.baidu.hugegraph.util.LZ4Util; import com.baidu.hugegraph.util.Log; @@ -162,7 +163,9 @@ public void onApply(Iterator iter) { private void applyCommand(StoreType type, StoreAction action, BytesBuffer buffer, boolean forwarded) { - BackendStore store = type != StoreType.ALL ? this.store(type) : null; + E.checkState(type != StoreType.ALL, + "Can't apply command for all store at one time"); + BackendStore store = this.store(type); switch (action) { case CLEAR: boolean clearSpace = buffer.read() > 0; diff --git a/hugegraph-rocksdb/src/main/java/com/baidu/hugegraph/backend/store/rocksdb/RocksDBStdSessions.java b/hugegraph-rocksdb/src/main/java/com/baidu/hugegraph/backend/store/rocksdb/RocksDBStdSessions.java index c8dbfbcbcb..f12ba4c127 100644 --- a/hugegraph-rocksdb/src/main/java/com/baidu/hugegraph/backend/store/rocksdb/RocksDBStdSessions.java +++ b/hugegraph-rocksdb/src/main/java/com/baidu/hugegraph/backend/store/rocksdb/RocksDBStdSessions.java @@ -306,8 +306,8 @@ public String hardLinkSnapshot(String snapshotPath) throws RocksDBException { snapshotPath, null).rocksdb) { RocksDBStdSessions.createCheckpoint(rocksdb, snapshotLinkPath); } - LOG.debug("The snapshot {} has been hard linked to {}", - snapshotPath, snapshotLinkPath); + LOG.info("The snapshot {} has been hard linked to {}", + snapshotPath, snapshotLinkPath); return snapshotLinkPath; } diff --git a/hugegraph-rocksdb/src/main/java/com/baidu/hugegraph/backend/store/rocksdb/RocksDBStore.java b/hugegraph-rocksdb/src/main/java/com/baidu/hugegraph/backend/store/rocksdb/RocksDBStore.java index 9cd31e530c..d470ec4e6e 100644 --- a/hugegraph-rocksdb/src/main/java/com/baidu/hugegraph/backend/store/rocksdb/RocksDBStore.java +++ b/hugegraph-rocksdb/src/main/java/com/baidu/hugegraph/backend/store/rocksdb/RocksDBStore.java @@ -81,12 +81,14 @@ public abstract class RocksDBStore extends AbstractBackendStore { private final BackendStoreProvider provider; private final Map tables; + private String dataPath; private RocksDBSessions sessions; private final Map tableDiskMapping; // DataPath:RocksDB mapping private final ConcurrentMap dbs; private final ReadWriteLock storeLock; + private static final String TABLE_GENERAL_KEY = "general"; private static final String DB_OPEN = "db-open-%s"; private static final long OPEN_TIMEOUT = 600L; /* @@ -166,6 +168,7 @@ public synchronized void open(HugeConfig config) { LOG.debug("Store open: {}", this.store); E.checkNotNull(config, "config"); + this.dataPath = config.get(RocksDBOptions.DATA_PATH); if (this.sessions != null && !this.sessions.closed()) { LOG.debug("Store {} has been opened before", this.store); @@ -185,8 +188,7 @@ public synchronized void open(HugeConfig config) { Map disks = config.getMap(RocksDBOptions.DATA_DISKS); Set openedDisks = new HashSet<>(); if (!disks.isEmpty()) { - String dataPath = config.get(RocksDBOptions.DATA_PATH); - this.parseTableDiskMapping(disks, dataPath); + this.parseTableDiskMapping(disks, this.dataPath); for (Entry e : this.tableDiskMapping.entrySet()) { String table = this.table(e.getKey()).table(); String disk = e.getValue(); @@ -607,11 +609,11 @@ protected Session session(HugeType tableType) { } @Override - public Set createSnapshot(String snapshotPrefix) { + public Map createSnapshot(String snapshotPrefix) { Lock readLock = this.storeLock.readLock(); readLock.lock(); try { - Set uniqueParents = new HashSet<>(); + Map uniqueSnapshotDirMaps = new HashMap<>(); // Every rocksdb instance should create an snapshot for (Map.Entry entry : this.dbs.entrySet()) { // Like: parent_path/rocksdb-data/*, * maybe g,m,s @@ -627,10 +629,14 @@ public Set createSnapshot(String snapshotPrefix) { RocksDBSessions sessions = entry.getValue(); sessions.createSnapshot(snapshotPath.toString()); - uniqueParents.add(snapshotPath.getParent().toString()); + String snapshotDir = snapshotPath.getParent().toString(); + // Find correspond data HugeType key + String diskTableKey = this.findDiskTableKeyByPath( + entry.getKey()); + uniqueSnapshotDirMaps.put(snapshotDir, diskTableKey); } LOG.info("The store '{}' create snapshot successfully", this); - return uniqueParents; + return uniqueSnapshotDirMaps; } finally { readLock.unlock(); } @@ -742,6 +748,28 @@ private final void parseTableDiskMapping(Map disks, } } + private Map reportDiskMapping() { + Map diskMapping = new HashMap<>(); + diskMapping.put(TABLE_GENERAL_KEY, this.dataPath); + for (Map.Entry e : this.tableDiskMapping.entrySet()) { + String key = this.store + "/" + e.getKey().name(); + String value = Paths.get(e.getValue()).getParent().toString(); + diskMapping.put(key, value); + } + return diskMapping; + } + + private String findDiskTableKeyByPath(String diskPath) { + String diskTableKey = TABLE_GENERAL_KEY; + for (Map.Entry e : this.tableDiskMapping.entrySet()) { + if (diskPath.equals(e.getValue())) { + diskTableKey = this.store + "/" + e.getKey().name(); + break; + } + } + return diskTableKey; + } + private final void checkDbOpened() { E.checkState(this.sessions != null && !this.sessions.closed(), "RocksDB has not been opened");