diff --git a/hugegraph-core/src/main/java/org/apache/hugegraph/HugeGraphParams.java b/hugegraph-core/src/main/java/org/apache/hugegraph/HugeGraphParams.java index 0b45f7fbc5..e655b7c02e 100644 --- a/hugegraph-core/src/main/java/org/apache/hugegraph/HugeGraphParams.java +++ b/hugegraph-core/src/main/java/org/apache/hugegraph/HugeGraphParams.java @@ -23,6 +23,7 @@ import org.apache.hugegraph.backend.store.ram.RamTable; import org.apache.hugegraph.backend.tx.GraphTransaction; import org.apache.hugegraph.backend.tx.SchemaTransaction; +import org.apache.hugegraph.job.EphemeralJob; import org.apache.hugegraph.task.ServerInfoManager; import org.apache.hugegraph.type.define.GraphMode; import org.apache.hugegraph.type.define.GraphReadMode; @@ -90,4 +91,6 @@ public interface HugeGraphParams { RateLimiter readRateLimiter(); RamTable ramtable(); + + void submitEphemeralJob(EphemeralJob job); } diff --git a/hugegraph-core/src/main/java/org/apache/hugegraph/StandardHugeGraph.java b/hugegraph-core/src/main/java/org/apache/hugegraph/StandardHugeGraph.java index c1d0108f7c..42a67158ed 100644 --- a/hugegraph-core/src/main/java/org/apache/hugegraph/StandardHugeGraph.java +++ b/hugegraph-core/src/main/java/org/apache/hugegraph/StandardHugeGraph.java @@ -51,6 +51,7 @@ import org.apache.hugegraph.backend.store.raft.RaftBackendStoreProvider; import org.apache.hugegraph.backend.store.raft.RaftGroupManager; import org.apache.hugegraph.backend.store.ram.RamTable; +import org.apache.hugegraph.task.EphemeralJobQueue; import org.apache.hugegraph.backend.tx.GraphTransaction; import org.apache.hugegraph.backend.tx.SchemaTransaction; import org.apache.hugegraph.config.CoreOptions; @@ -60,6 +61,7 @@ import org.apache.hugegraph.event.EventListener; import org.apache.hugegraph.exception.NotAllowException; import org.apache.hugegraph.io.HugeGraphIoRegistry; +import org.apache.hugegraph.job.EphemeralJob; import org.apache.hugegraph.masterelection.ClusterRoleStore; import org.apache.hugegraph.masterelection.Config; import org.apache.hugegraph.masterelection.RoleElectionConfig; @@ -1163,6 +1165,7 @@ private void waitUntilAllTasksCompleted() { private class StandardHugeGraphParams implements HugeGraphParams { private HugeGraph graph = StandardHugeGraph.this; + private final EphemeralJobQueue ephemeralJobQueue = new EphemeralJobQueue(this); private void graph(HugeGraph graph) { this.graph = graph; @@ -1304,6 +1307,11 @@ public RateLimiter readRateLimiter() { public RamTable ramtable() { return StandardHugeGraph.this.ramtable; } + + @Override + public void submitEphemeralJob(EphemeralJob job) { + this.ephemeralJobQueue.add(job); + } } private class TinkerPopTransaction extends AbstractThreadLocalTransaction { diff --git a/hugegraph-core/src/main/java/org/apache/hugegraph/backend/store/raft/StoreSnapshotFile.java b/hugegraph-core/src/main/java/org/apache/hugegraph/backend/store/raft/StoreSnapshotFile.java index 6cf08f1e58..7c8d604583 100644 --- a/hugegraph-core/src/main/java/org/apache/hugegraph/backend/store/raft/StoreSnapshotFile.java +++ b/hugegraph-core/src/main/java/org/apache/hugegraph/backend/store/raft/StoreSnapshotFile.java @@ -164,7 +164,7 @@ private void compressSnapshotDir(SnapshotWriter writer, Map snap try { LOG.info("Prepare to compress dir '{}' to '{}'", snapshotDir, outputFile); long begin = System.currentTimeMillis(); - String rootDir = Paths.get(snapshotDir).getParent().toString(); + String rootDir = Paths.get(snapshotDir).toAbsolutePath().getParent().toString(); String sourceDir = Paths.get(snapshotDir).getFileName().toString(); CompressStrategyManager.getDefault() .compressZip(rootDir, sourceDir, outputFile, checksum); @@ -200,7 +200,7 @@ private String decompressSnapshot(SnapshotReader reader, E.checkArgument(this.dataDisks.containsKey(diskTableKey), "The data path for '%s' should be exist", diskTableKey); String dataPath = this.dataDisks.get(diskTableKey); - String parentPath = Paths.get(dataPath).getParent().toString(); + String parentPath = Paths.get(dataPath).toAbsolutePath().getParent().toString(); String snapshotDir = Paths.get(parentPath, StringUtils.removeEnd(snapshotDirTar, TAR)) .toString(); FileUtils.deleteDirectory(new File(snapshotDir)); diff --git a/hugegraph-core/src/main/java/org/apache/hugegraph/backend/store/raft/rpc/RpcForwarder.java b/hugegraph-core/src/main/java/org/apache/hugegraph/backend/store/raft/rpc/RpcForwarder.java index 389a2ecf44..b6809e6df4 100644 --- a/hugegraph-core/src/main/java/org/apache/hugegraph/backend/store/raft/rpc/RpcForwarder.java +++ b/hugegraph-core/src/main/java/org/apache/hugegraph/backend/store/raft/rpc/RpcForwarder.java @@ -77,7 +77,14 @@ public void forwardToLeader(PeerId leaderId, StoreCommand command, public void setResponse(StoreCommandResponse response) { if (response.getStatus()) { LOG.debug("StoreCommandResponse status ok"); - future.complete(Status.OK(), () -> null); + // This code forwards the request to the Raft leader and considers the operation successful + // if it's forwarded successfully. It returns a RaftClosure because the calling + // logic expects a RaftClosure result. Specifically, if the current instance is the Raft leader, + // it executes the corresponding logic locally and notifies the calling logic asynchronously + // via RaftClosure. Therefore, the result is returned as a RaftClosure here. + RaftClosure supplierFuture = new RaftClosure<>(); + supplierFuture.complete(Status.OK()); + future.complete(Status.OK(), () -> supplierFuture); } else { LOG.debug("StoreCommandResponse status error"); Status status = new Status(RaftError.UNKNOWN, diff --git a/hugegraph-core/src/main/java/org/apache/hugegraph/backend/tx/GraphIndexTransaction.java b/hugegraph-core/src/main/java/org/apache/hugegraph/backend/tx/GraphIndexTransaction.java index f4ff8b32ec..bc0bc0be11 100644 --- a/hugegraph-core/src/main/java/org/apache/hugegraph/backend/tx/GraphIndexTransaction.java +++ b/hugegraph-core/src/main/java/org/apache/hugegraph/backend/tx/GraphIndexTransaction.java @@ -36,6 +36,7 @@ import org.apache.hugegraph.backend.page.PageState; import org.apache.hugegraph.backend.store.BackendEntry; import org.apache.hugegraph.backend.store.BackendStore; +import org.apache.hugegraph.task.EphemeralJobQueue; import org.apache.tinkerpop.gremlin.structure.Edge; import org.apache.tinkerpop.gremlin.structure.Vertex; import org.apache.tinkerpop.gremlin.structure.util.CloseableIterator; @@ -69,7 +70,6 @@ import org.apache.hugegraph.exception.NotSupportException; import org.apache.hugegraph.iterator.Metadatable; import org.apache.hugegraph.job.EphemeralJob; -import org.apache.hugegraph.job.EphemeralJobBuilder; import org.apache.hugegraph.job.system.DeleteExpiredJob; import org.apache.hugegraph.perf.PerfUtil.Watched; import org.apache.hugegraph.schema.IndexLabel; @@ -81,7 +81,6 @@ import org.apache.hugegraph.structure.HugeIndex.IdWithExpiredTime; import org.apache.hugegraph.structure.HugeProperty; import org.apache.hugegraph.structure.HugeVertex; -import org.apache.hugegraph.task.HugeTask; import org.apache.hugegraph.type.HugeType; import org.apache.hugegraph.type.define.Action; import org.apache.hugegraph.type.define.HugeKeys; @@ -115,15 +114,11 @@ public GraphIndexTransaction(HugeGraphParams graph, BackendStore store) { conf.get(CoreOptions.QUERY_INDEX_INTERSECT_THRESHOLD); } - protected Id asyncRemoveIndexLeft(ConditionQuery query, - HugeElement element) { + protected void asyncRemoveIndexLeft(ConditionQuery query, + HugeElement element) { LOG.info("Remove left index: {}, query: {}", element, query); RemoveLeftIndexJob job = new RemoveLeftIndexJob(query, element); - HugeTask task = EphemeralJobBuilder.of(this.graph()) - .name(element.id().asString()) - .job(job) - .schedule(); - return task.id(); + this.params().submitEphemeralJob(job); } @Watched(prefix = "index") @@ -1717,7 +1712,8 @@ private static Query parent(Collection queries) { } } - public static class RemoveLeftIndexJob extends EphemeralJob { + public static class RemoveLeftIndexJob extends EphemeralJob + implements EphemeralJobQueue.Reduce { private static final String REMOVE_LEFT_INDEX = "remove_left_index"; @@ -1741,7 +1737,7 @@ public String type() { } @Override - public Object execute() { + public Long execute() { this.tx = this.element.schemaLabel().system() ? this.params().systemTransaction().indexTransaction() : this.params().graphTransaction().indexTransaction(); @@ -1780,7 +1776,6 @@ protected long removeIndexLeft(ConditionQuery query, // Process secondary index or search index sCount += this.processSecondaryOrSearchIndexLeft(cq, element); } - this.tx.commit(); return rCount + sCount; } @@ -1808,7 +1803,6 @@ private long processRangeIndexLeft(ConditionQuery query, } // Remove LeftIndex after constructing remove job this.query.removeElementLeftIndex(element.id()); - this.tx.commit(); return count; } @@ -1873,11 +1867,9 @@ private long processSecondaryOrSearchIndexLeft(ConditionQuery query, */ this.tx.updateIndex(il.id(), element, false); } - this.tx.commit(); if (this.deletedByError(element, incorrectIndexFields, incorrectPKs)) { this.tx.updateIndex(il.id(), deletion, false); - this.tx.commit(); } else { count++; } @@ -1949,5 +1941,18 @@ private HugeElement newestElement(HugeElement element) { return (HugeEdge) QueryResults.one(iter); } } + + @Override + public Long reduce(Long t1, Long t2) { + if (t1 == null) { + return t2; + } + + if (t2 == null) { + return t1; + } + + return t1 + t2; + } } } diff --git a/hugegraph-core/src/main/java/org/apache/hugegraph/task/EphemeralJobQueue.java b/hugegraph-core/src/main/java/org/apache/hugegraph/task/EphemeralJobQueue.java new file mode 100644 index 0000000000..70f49073de --- /dev/null +++ b/hugegraph-core/src/main/java/org/apache/hugegraph/task/EphemeralJobQueue.java @@ -0,0 +1,241 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hugegraph.task; + +import java.lang.ref.WeakReference; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.atomic.AtomicReference; + +import org.apache.hugegraph.HugeGraphParams; +import org.apache.hugegraph.backend.query.Query; +import org.apache.hugegraph.backend.tx.GraphTransaction; +import org.apache.hugegraph.job.EphemeralJob; +import org.apache.hugegraph.job.EphemeralJobBuilder; +import org.apache.hugegraph.util.Log; +import org.slf4j.Logger; + +public class EphemeralJobQueue { + + private static final Logger LOG = Log.logger(EphemeralJobQueue.class); + + private static final long CAPACITY = 100 * Query.COMMIT_BATCH; + + private final BlockingQueue> pendingQueue; + + private final AtomicReference state; + + private final HugeGraphParams graph; + + private enum State { + INIT, + EXECUTE, + } + + public EphemeralJobQueue(HugeGraphParams graph) { + this.state = new AtomicReference<>(State.INIT); + this.graph = graph; + this.pendingQueue = new ArrayBlockingQueue<>((int) CAPACITY); + } + + public boolean add(EphemeralJob job) { + if (job == null) { + return false; + } + + if (!this.pendingQueue.offer(job)) { + LOG.warn("The pending queue of EphemeralJobQueue is full, {} job " + + "will be ignored", job.type()); + return false; + } + + this.reScheduleIfNeeded(); + return true; + } + + protected HugeGraphParams params() { + return this.graph; + } + + protected void clear() { + this.pendingQueue.clear(); + } + + protected EphemeralJob poll() { + return this.pendingQueue.poll(); + } + + public void consumeComplete() { + this.state.compareAndSet(State.EXECUTE, State.INIT); + } + + public void reScheduleIfNeeded() { + if (this.state.compareAndSet(State.INIT, State.EXECUTE)) { + try { + BatchEphemeralJob job = new BatchEphemeralJob(this); + EphemeralJobBuilder.of(this.graph.graph()) + .name("batch-ephemeral-job") + .job(job) + .schedule(); + } catch (Throwable e) { + // Maybe if it fails, consider clearing all the data in the pendingQueue, + // or start a scheduled retry task to retry until success. + LOG.warn("Failed to schedule BatchEphemeralJob", e); + this.pendingQueue.clear(); + this.state.compareAndSet(State.EXECUTE, State.INIT); + } + } + } + + public boolean isEmpty() { + return this.pendingQueue.isEmpty(); + } + + public static class BatchEphemeralJob extends EphemeralJob { + + private static final long PAGE_SIZE = Query.COMMIT_BATCH; + private static final String BATCH_EPHEMERAL_JOB = "batch-ephemeral-job"; + private static final long MAX_CONSUME_COUNT = 2 * PAGE_SIZE; + + private WeakReference queueWeakReference; + + public BatchEphemeralJob(EphemeralJobQueue queue) { + this.queueWeakReference = new WeakReference<>(queue); + } + + @Override + public String type() { + return BATCH_EPHEMERAL_JOB; + } + + @Override + public Object execute() throws Exception { + boolean stop = false; + Object result = null; + int consumeCount = 0; + InterruptedException interruptedException = null; + EphemeralJobQueue queue; + List> batchJobs = new ArrayList<>(); + while (!stop) { + if (interruptedException == null && Thread.currentThread().isInterrupted()) { + interruptedException = new InterruptedException(); + } + + queue = this.queueWeakReference.get(); + if (queue == null) { + stop = true; + continue; + } + + if (queue.isEmpty() || consumeCount > MAX_CONSUME_COUNT || + interruptedException != null) { + queue.consumeComplete(); + stop = true; + if (!queue.isEmpty()) { + queue.reScheduleIfNeeded(); + } + continue; + } + + try { + while (!queue.isEmpty() && batchJobs.size() < PAGE_SIZE) { + EphemeralJob job = queue.poll(); + if (job == null) { + continue; + } + batchJobs.add(job); + } + + if (batchJobs.isEmpty()) { + continue; + } + + consumeCount += batchJobs.size(); + result = this.executeBatchJob(batchJobs, result); + + } catch (InterruptedException e) { + interruptedException = e; + } finally { + batchJobs.clear(); + } + } + + if (interruptedException != null) { + Thread.currentThread().interrupt(); + throw interruptedException; + } + + return result; + } + + private Object executeBatchJob(List> jobs, Object prevResult) throws Exception { + GraphTransaction graphTx = this.params().systemTransaction(); + GraphTransaction systemTx = this.params().graphTransaction(); + Object result = prevResult; + for (EphemeralJob job : jobs) { + this.initJob(job); + Object obj = job.call(); + if (job instanceof Reduce) { + result = ((Reduce) job).reduce(result, obj); + } + } + + graphTx.commit(); + systemTx.commit(); + + return result; + } + + private void initJob(EphemeralJob job) { + job.graph(this.graph()); + job.params(this.params()); + } + + @Override + public Object call() throws Exception { + try { + return super.call(); + } catch (Throwable e) { + LOG.warn("Failed to execute BatchEphemeralJob", e); + EphemeralJobQueue queue = this.queueWeakReference.get(); + if (e instanceof InterruptedException) { + Thread.currentThread().interrupt(); + if (queue != null) { + queue.clear(); + queue.consumeComplete(); + } + throw e; + } + + if (queue != null) { + queue.consumeComplete(); + if (!queue.isEmpty()) { + queue.reScheduleIfNeeded(); + } + } + throw e; + } + } + } + + public interface Reduce { + T reduce(T t1, T t2); + } +} diff --git a/hugegraph-core/src/main/java/org/apache/hugegraph/util/CompressUtil.java b/hugegraph-core/src/main/java/org/apache/hugegraph/util/CompressUtil.java index 2a01ba6a04..0f5c179f47 100644 --- a/hugegraph-core/src/main/java/org/apache/hugegraph/util/CompressUtil.java +++ b/hugegraph-core/src/main/java/org/apache/hugegraph/util/CompressUtil.java @@ -145,7 +145,7 @@ public static void decompressTar(String sourceFile, String outputDir, Files.createDirectories(newPath); } else { // check parent folder again - Path parent = newPath.getParent(); + Path parent = newPath.toAbsolutePath().getParent(); if (parent != null) { if (Files.notExists(parent)) { Files.createDirectories(parent); @@ -176,7 +176,7 @@ private static Path zipSlipProtect(ArchiveEntry entry, Path targetDir) public static void compressZip(String inputDir, String outputFile, Checksum checksum) throws IOException { - String rootDir = Paths.get(inputDir).getParent().toString(); + String rootDir = Paths.get(inputDir).toAbsolutePath().getParent().toString(); String sourceDir = Paths.get(inputDir).getFileName().toString(); compressZip(rootDir, sourceDir, outputFile, checksum); } diff --git a/hugegraph-rocksdb/src/main/java/org/apache/hugegraph/backend/store/rocksdb/OpenedRocksDB.java b/hugegraph-rocksdb/src/main/java/org/apache/hugegraph/backend/store/rocksdb/OpenedRocksDB.java index 3ae6ba3fea..91e02878aa 100644 --- a/hugegraph-rocksdb/src/main/java/org/apache/hugegraph/backend/store/rocksdb/OpenedRocksDB.java +++ b/hugegraph-rocksdb/src/main/java/org/apache/hugegraph/backend/store/rocksdb/OpenedRocksDB.java @@ -99,7 +99,7 @@ public long totalSize() { } public void createCheckpoint(String targetPath) { - Path parentName = Paths.get(targetPath).getParent().getFileName(); + Path parentName = Paths.get(targetPath).toAbsolutePath().getParent().getFileName(); assert parentName.toString().startsWith("snapshot") : targetPath; // https://github.com/facebook/rocksdb/wiki/Checkpoints try (Checkpoint checkpoint = Checkpoint.create(this.rocksdb)) { diff --git a/hugegraph-rocksdb/src/main/java/org/apache/hugegraph/backend/store/rocksdb/RocksDBStdSessions.java b/hugegraph-rocksdb/src/main/java/org/apache/hugegraph/backend/store/rocksdb/RocksDBStdSessions.java index 71a66906dd..bcbe37b7c3 100644 --- a/hugegraph-rocksdb/src/main/java/org/apache/hugegraph/backend/store/rocksdb/RocksDBStdSessions.java +++ b/hugegraph-rocksdb/src/main/java/org/apache/hugegraph/backend/store/rocksdb/RocksDBStdSessions.java @@ -295,9 +295,9 @@ public void resumeSnapshot(String snapshotPath) { public String buildSnapshotPath(String snapshotPrefix) { // Like: parent_path/rocksdb-data/*, * can be g,m,s Path originDataPath = Paths.get(this.dataPath); - Path parentParentPath = originDataPath.getParent().getParent(); + Path parentParentPath = originDataPath.toAbsolutePath().getParent().getParent(); // Like: rocksdb-data/* - Path pureDataPath = parentParentPath.relativize(originDataPath); + Path pureDataPath = parentParentPath.relativize(originDataPath.toAbsolutePath()); // Like: parent_path/snapshot_rocksdb-data/* Path snapshotPath = parentParentPath.resolve(snapshotPrefix + "_" + pureDataPath); diff --git a/hugegraph-rocksdb/src/main/java/org/apache/hugegraph/backend/store/rocksdb/RocksDBStore.java b/hugegraph-rocksdb/src/main/java/org/apache/hugegraph/backend/store/rocksdb/RocksDBStore.java index 4158c7d832..2dba5fa766 100644 --- a/hugegraph-rocksdb/src/main/java/org/apache/hugegraph/backend/store/rocksdb/RocksDBStore.java +++ b/hugegraph-rocksdb/src/main/java/org/apache/hugegraph/backend/store/rocksdb/RocksDBStore.java @@ -729,7 +729,7 @@ public Map createSnapshot(String snapshotPrefix) { for (Map.Entry entry : this.dbs.entrySet()) { // Like: parent_path/rocksdb-data/*, * maybe g,m,s Path originDataPath = Paths.get(entry.getKey()).toAbsolutePath(); - Path parentParentPath = originDataPath.getParent().getParent(); + Path parentParentPath = originDataPath.toAbsolutePath().getParent().getParent(); // Like: rocksdb-data/* Path pureDataPath = parentParentPath.relativize(originDataPath); // Like: parent_path/snapshot_rocksdb-data/* @@ -740,7 +740,7 @@ public Map createSnapshot(String snapshotPrefix) { RocksDBSessions sessions = entry.getValue(); sessions.createSnapshot(snapshotPath.toString()); - String snapshotDir = snapshotPath.getParent().toString(); + String snapshotDir = snapshotPath.toAbsolutePath().getParent().toString(); // Find correspond data HugeType key String diskTableKey = this.findDiskTableKeyByPath( entry.getKey()); @@ -781,7 +781,7 @@ public void resumeSnapshot(String snapshotPrefix, boolean deleteSnapshot) { if (deleteSnapshot) { // Delete empty snapshot parent directory - Path parentPath = Paths.get(snapshotPath).getParent(); + Path parentPath = Paths.get(snapshotPath).toAbsolutePath().getParent(); if (Files.list(parentPath).count() == 0) { FileUtils.deleteDirectory(parentPath.toFile()); } @@ -866,7 +866,7 @@ private Map reportDiskMapping() { diskMapping.put(TABLE_GENERAL_KEY, this.dataPath); for (Map.Entry e : this.tableDiskMapping.entrySet()) { String key = this.store + "/" + e.getKey().name(); - String value = Paths.get(e.getValue()).getParent().toString(); + String value = Paths.get(e.getValue()).toAbsolutePath().getParent().toString(); diskMapping.put(key, value); } return diskMapping; diff --git a/hugegraph-rocksdb/src/main/java/org/apache/hugegraph/backend/store/rocksdbsst/RocksDBSstSessions.java b/hugegraph-rocksdb/src/main/java/org/apache/hugegraph/backend/store/rocksdbsst/RocksDBSstSessions.java index 393cb2ef13..3d2b7f867a 100644 --- a/hugegraph-rocksdb/src/main/java/org/apache/hugegraph/backend/store/rocksdbsst/RocksDBSstSessions.java +++ b/hugegraph-rocksdb/src/main/java/org/apache/hugegraph/backend/store/rocksdbsst/RocksDBSstSessions.java @@ -108,7 +108,7 @@ private void createTable(String table) throws RocksDBException { Path sstFile = Paths.get(this.dataPath, table, number + RocksDBIngester.SST); try { - FileUtils.forceMkdir(sstFile.getParent().toFile()); + FileUtils.forceMkdir(sstFile.toAbsolutePath().getParent().toFile()); } catch (IOException e) { throw new BackendException("Can't make directory for sst: '%s'", e, sstFile.toString());