From 9a4c543056cc16867242f4a174b430a16d39272f Mon Sep 17 00:00:00 2001 From: Jermy Li Date: Tue, 9 Jul 2019 21:13:59 +0800 Subject: [PATCH] fix cross-close the underlying rocksdb session pool (#598) fix: #597 Change-Id: I8b185cd7f81a9a04bc6fd971490ae887fd4ddbb5 --- .../java/com/baidu/hugegraph/HugeGraph.java | 5 +++ .../store/rocksdb/RocksDBSessions.java | 3 ++ .../store/rocksdb/RocksDBStdSessions.java | 43 ++++++++++++++++--- .../backend/store/rocksdb/RocksDBStore.java | 27 +++++++----- .../store/rocksdbsst/RocksDBSstSessions.java | 19 +++++++- .../store/rocksdbsst/RocksDBSstStore.java | 8 ++-- .../baidu/hugegraph/core/EdgeCoreTest.java | 3 ++ .../baidu/hugegraph/core/MultiGraphsTest.java | 3 +- .../baidu/hugegraph/core/VertexCoreTest.java | 12 ++++++ .../unit/rocksdb/BaseRocksDBUnitTest.java | 4 +- .../src/main/resources/hugegraph.properties | 5 ++- 11 files changed, 104 insertions(+), 28 deletions(-) diff --git a/hugegraph-core/src/main/java/com/baidu/hugegraph/HugeGraph.java b/hugegraph-core/src/main/java/com/baidu/hugegraph/HugeGraph.java index 3f00f4daf9..08046c112d 100644 --- a/hugegraph-core/src/main/java/com/baidu/hugegraph/HugeGraph.java +++ b/hugegraph-core/src/main/java/com/baidu/hugegraph/HugeGraph.java @@ -465,6 +465,11 @@ public Transaction tx() { @Override public void close() throws HugeException { + if (this.closed()) { + return; + } + + LOG.info("Close graph {}", this); this.taskManager.closeScheduler(this); try { this.closeTx(); diff --git a/hugegraph-rocksdb/src/main/java/com/baidu/hugegraph/backend/store/rocksdb/RocksDBSessions.java b/hugegraph-rocksdb/src/main/java/com/baidu/hugegraph/backend/store/rocksdb/RocksDBSessions.java index a55eefa90d..d60d5b35a6 100644 --- a/hugegraph-rocksdb/src/main/java/com/baidu/hugegraph/backend/store/rocksdb/RocksDBSessions.java +++ b/hugegraph-rocksdb/src/main/java/com/baidu/hugegraph/backend/store/rocksdb/RocksDBSessions.java @@ -41,6 +41,9 @@ public RocksDBSessions(HugeConfig config, String database, String store) { public abstract String property(String property); + public abstract RocksDBSessions copy(HugeConfig config, + String database, String store); + @Override public abstract Session session(); diff --git a/hugegraph-rocksdb/src/main/java/com/baidu/hugegraph/backend/store/rocksdb/RocksDBStdSessions.java b/hugegraph-rocksdb/src/main/java/com/baidu/hugegraph/backend/store/rocksdb/RocksDBStdSessions.java index 598a307ce6..8c8a395a43 100644 --- a/hugegraph-rocksdb/src/main/java/com/baidu/hugegraph/backend/store/rocksdb/RocksDBStdSessions.java +++ b/hugegraph-rocksdb/src/main/java/com/baidu/hugegraph/backend/store/rocksdb/RocksDBStdSessions.java @@ -29,6 +29,7 @@ import java.util.Map; import java.util.NoSuchElementException; import java.util.Set; +import java.util.concurrent.atomic.AtomicInteger; import org.rocksdb.BlockBasedTableConfig; import org.rocksdb.BloomFilter; @@ -63,13 +64,14 @@ public class RocksDBStdSessions extends RocksDBSessions { - private final Map cfs = new HashMap<>(); - private final RocksDB rocksdb; private final SstFileManager sstFileManager; - public RocksDBStdSessions(HugeConfig config, String dataPath, - String walPath, String database, String store) + private final Map cfs; + private final AtomicInteger refCount; + + public RocksDBStdSessions(HugeConfig config, String database, String store, + String dataPath, String walPath) throws RocksDBException { super(config, database, store); @@ -86,10 +88,13 @@ public RocksDBStdSessions(HugeConfig config, String dataPath, * Don't merge old CFs, we expect a clear DB when using this one */ this.rocksdb = RocksDB.open(options, dataPath); + + this.cfs = new HashMap<>(); + this.refCount = new AtomicInteger(1); } - public RocksDBStdSessions(HugeConfig config, String dataPath, - String walPath, String database, String store, + public RocksDBStdSessions(HugeConfig config, String database, String store, + String dataPath, String walPath, List cfNames) throws RocksDBException { super(config, database, store); @@ -121,13 +126,28 @@ public RocksDBStdSessions(HugeConfig config, String dataPath, "Expect same size of cf-handles and cf-names"); // Collect CF Handles + this.cfs = new HashMap<>(); for (int i = 0; i < cfs.size(); i++) { this.cfs.put(cfs.get(i), cfhs.get(i)); } + this.refCount = new AtomicInteger(1); + ingestExternalFile(); } + private RocksDBStdSessions(HugeConfig config, String database, String store, + RocksDBStdSessions origin) { + super(config, database, store); + + this.rocksdb = origin.rocksdb; + this.sstFileManager = origin.sstFileManager; + this.cfs = origin.cfs; + this.refCount = origin.refCount; + + this.refCount.incrementAndGet(); + } + @Override public void open() throws Exception { // pass @@ -182,6 +202,12 @@ public String property(String property) { } } + @Override + public RocksDBSessions copy(HugeConfig config, + String database, String store) { + return new RocksDBStdSessions(config, database, store, this); + } + @Override public final Session session() { return (Session) super.getOrNewSession(); @@ -198,6 +224,11 @@ protected final Session newSession() { protected synchronized void doClose() { this.checkValid(); + if (this.refCount.decrementAndGet() > 0) { + return; + } + assert this.refCount.get() == 0; + for (ColumnFamilyHandle cf : this.cfs.values()) { cf.close(); } diff --git a/hugegraph-rocksdb/src/main/java/com/baidu/hugegraph/backend/store/rocksdb/RocksDBStore.java b/hugegraph-rocksdb/src/main/java/com/baidu/hugegraph/backend/store/rocksdb/RocksDBStore.java index c43e824a11..8f6468390c 100644 --- a/hugegraph-rocksdb/src/main/java/com/baidu/hugegraph/backend/store/rocksdb/RocksDBStore.java +++ b/hugegraph-rocksdb/src/main/java/com/baidu/hugegraph/backend/store/rocksdb/RocksDBStore.java @@ -92,7 +92,7 @@ public RocksDBStore(final BackendStoreProvider provider, private void registerMetaHandlers() { this.registerMetaHandler("metrics", (session, meta, args) -> { List dbs = new ArrayList<>(); - dbs.add(sessions); + dbs.add(this.sessions); dbs.addAll(tableDBMapping().values()); RocksDBMetrics metrics = new RocksDBMetrics(dbs, session); @@ -157,7 +157,8 @@ public synchronized void open(HugeConfig config) { // Open tables with optimized disk Map disks = config.getMap(RocksDBOptions.DATA_DISKS); if (!disks.isEmpty()) { - this.parseTableDiskMapping(disks); + String dataPath = config.get(RocksDBOptions.DATA_PATH); + this.parseTableDiskMapping(disks, dataPath); for (Entry e : this.tableDiskMapping.entrySet()) { String table = this.table(e.getKey()).table(); String disk = e.getValue(); @@ -181,10 +182,11 @@ protected RocksDBSessions open(HugeConfig config, String dataPath, sessions = this.openSessionPool(config, dataPath, walPath, tableNames); } catch (RocksDBException e) { - if (dbs.containsKey(dataPath)) { + RocksDBSessions origin = dbs.get(dataPath); + if (origin != null) { if (e.getMessage().contains("No locks available")) { // Open twice, but we should support keyspace - sessions = dbs.get(dataPath); + sessions = origin.copy(config, this.database, this.store); } } @@ -222,6 +224,7 @@ protected RocksDBSessions open(HugeConfig config, String dataPath, } if (sessions != null) { + // May override the original session pool dbs.put(dataPath, sessions); sessions.session(); LOG.debug("Store opened: {}", dataPath); @@ -235,12 +238,11 @@ protected RocksDBSessions openSessionPool(HugeConfig config, List tableNames) throws RocksDBException { if (tableNames == null) { - return new RocksDBStdSessions(config, dataPath, walPath, - this.database, this.store); + return new RocksDBStdSessions(config, this.database, this.store, + dataPath, walPath); } else { - return new RocksDBStdSessions(config, dataPath, walPath, - this.database, this.store, - tableNames); + return new RocksDBStdSessions(config, this.database, this.store, + dataPath, walPath, tableNames); } } @@ -363,7 +365,7 @@ public void clear() { private void dropTable(RocksDBSessions db, String table) { try { - this.sessions.dropTable(table); + db.dropTable(table); } catch (BackendException e) { if (e.getMessage().contains("is not opened")) { return; @@ -446,12 +448,15 @@ private void checkOpened() { this.database, this.provider.type()); } - private void parseTableDiskMapping(Map disks) { + private void parseTableDiskMapping(Map disks, + String dataPath) { this.tableDiskMapping.clear(); for (Map.Entry disk : disks.entrySet()) { // The format of `disk` like: `graph/vertex: /path/to/disk1` String name = disk.getKey(); String path = disk.getValue(); + E.checkArgument(!dataPath.equals(path), "Invalid disk path" + + "(can't be the same as data_path): '%s'", path); E.checkArgument(!name.isEmpty() && !path.isEmpty(), "Invalid disk format: '%s', expect `NAME:PATH`", disk); diff --git a/hugegraph-rocksdb/src/main/java/com/baidu/hugegraph/backend/store/rocksdbsst/RocksDBSstSessions.java b/hugegraph-rocksdb/src/main/java/com/baidu/hugegraph/backend/store/rocksdbsst/RocksDBSstSessions.java index c9599b5a96..5dbb3f2ec6 100644 --- a/hugegraph-rocksdb/src/main/java/com/baidu/hugegraph/backend/store/rocksdbsst/RocksDBSstSessions.java +++ b/hugegraph-rocksdb/src/main/java/com/baidu/hugegraph/backend/store/rocksdbsst/RocksDBSstSessions.java @@ -49,8 +49,8 @@ public class RocksDBSstSessions extends RocksDBSessions { private final String dataPath; private final Map tables; - public RocksDBSstSessions(HugeConfig config, String dataPath, - String database, String store) { + public RocksDBSstSessions(HugeConfig config, String database, String store, + String dataPath) { super(config, database, store); this.dataPath = dataPath; @@ -71,6 +71,14 @@ public RocksDBSstSessions(HugeConfig config, String dataPath, } } + private RocksDBSstSessions(HugeConfig config, String database, String store, + RocksDBSstSessions origin) { + super(config, database, store); + + this.dataPath = origin.dataPath; + this.tables = origin.tables; + } + @Override public void open() throws Exception { // pass @@ -110,6 +118,13 @@ public String property(String property) { throw new NotSupportException("RocksDBSstStore property()"); } + @Override + public RocksDBSessions copy(HugeConfig config, + String database, String store) { + return new RocksDBSstSessions(config, database, store, this); + } + + private SstFileWriter table(String table) { SstFileWriter sst = this.tables.get(table); if (sst == null) { diff --git a/hugegraph-rocksdb/src/main/java/com/baidu/hugegraph/backend/store/rocksdbsst/RocksDBSstStore.java b/hugegraph-rocksdb/src/main/java/com/baidu/hugegraph/backend/store/rocksdbsst/RocksDBSstStore.java index 83453e692b..45d0aede8b 100644 --- a/hugegraph-rocksdb/src/main/java/com/baidu/hugegraph/backend/store/rocksdbsst/RocksDBSstStore.java +++ b/hugegraph-rocksdb/src/main/java/com/baidu/hugegraph/backend/store/rocksdbsst/RocksDBSstStore.java @@ -44,11 +44,11 @@ protected RocksDBSessions openSessionPool(HugeConfig config, List tableNames) throws RocksDBException { if (tableNames == null) { - return new RocksDBSstSessions(config, dataPath, this.database(), - this.store()); + return new RocksDBSstSessions(config, this.database(), + this.store(), dataPath); } else { - return new RocksDBSstSessions(config, dataPath, this.database(), - this.store(), tableNames); + return new RocksDBSstSessions(config, this.database(), this.store(), + dataPath, tableNames); } } diff --git a/hugegraph-test/src/main/java/com/baidu/hugegraph/core/EdgeCoreTest.java b/hugegraph-test/src/main/java/com/baidu/hugegraph/core/EdgeCoreTest.java index 5a8c9d34f3..79b8fd248e 100644 --- a/hugegraph-test/src/main/java/com/baidu/hugegraph/core/EdgeCoreTest.java +++ b/hugegraph-test/src/main/java/com/baidu/hugegraph/core/EdgeCoreTest.java @@ -2190,6 +2190,9 @@ public void testRemoveEdgesOfSuperVertex() { Assert.assertTrue(e.getMessage().contains( "Edges size has reached tx capacity")); }); + + // Clear all + graph.truncateBackend(); } @Test diff --git a/hugegraph-test/src/main/java/com/baidu/hugegraph/core/MultiGraphsTest.java b/hugegraph-test/src/main/java/com/baidu/hugegraph/core/MultiGraphsTest.java index c8f8a975cc..eab1a93bb2 100644 --- a/hugegraph-test/src/main/java/com/baidu/hugegraph/core/MultiGraphsTest.java +++ b/hugegraph-test/src/main/java/com/baidu/hugegraph/core/MultiGraphsTest.java @@ -108,7 +108,8 @@ public void testCreateGraphsWithSameName() { g1.clearBackend(); g2.clearBackend(); g3.clearBackend(); - destoryGraphs(ImmutableList.of(g1)); + + destoryGraphs(ImmutableList.of(g1, g2, g3)); } @Test diff --git a/hugegraph-test/src/main/java/com/baidu/hugegraph/core/VertexCoreTest.java b/hugegraph-test/src/main/java/com/baidu/hugegraph/core/VertexCoreTest.java index 45827cb21d..59058c0a9b 100644 --- a/hugegraph-test/src/main/java/com/baidu/hugegraph/core/VertexCoreTest.java +++ b/hugegraph-test/src/main/java/com/baidu/hugegraph/core/VertexCoreTest.java @@ -697,6 +697,8 @@ public void testAddVertexWithCustomizeNumberIdStrategy() { .create(); graph.addVertex(T.label, "programmer", T.id, 123456, "name", "marko", "age", 18, "city", "Beijing"); + graph.addVertex(T.label, "programmer", T.id, 61695499031416832L, + "name", "marko", "age", 18, "city", "Beijing"); graph.tx().commit(); List vertices = graph.traversal().V(123456).toList(); @@ -707,6 +709,16 @@ public void testAddVertexWithCustomizeNumberIdStrategy() { assertContains(vertices, T.label, "programmer", "name", "marko", "age", 18, "city", "Beijing"); + + vertices = graph.traversal().V(61695499031416832L).toList(); + Assert.assertEquals(1, vertices.size()); + id = vertices.get(0).id(); + Assert.assertEquals(IdGenerator.LongId.class, id.getClass()); + Assert.assertEquals(61695499031416832L, + ((IdGenerator.LongId) id).asLong()); + assertContains(vertices, + T.label, "programmer", "name", "marko", + "age", 18, "city", "Beijing"); } @Test diff --git a/hugegraph-test/src/main/java/com/baidu/hugegraph/unit/rocksdb/BaseRocksDBUnitTest.java b/hugegraph-test/src/main/java/com/baidu/hugegraph/unit/rocksdb/BaseRocksDBUnitTest.java index 0b114af9f2..907ff97dff 100644 --- a/hugegraph-test/src/main/java/com/baidu/hugegraph/unit/rocksdb/BaseRocksDBUnitTest.java +++ b/hugegraph-test/src/main/java/com/baidu/hugegraph/unit/rocksdb/BaseRocksDBUnitTest.java @@ -111,8 +111,8 @@ protected static long l(byte[] bytes) { private static RocksDBSessions open(String table) throws RocksDBException { HugeConfig config = FakeObjects.newConfig(); - RocksDBSessions rocks = new RocksDBStdSessions(config, DB_PATH, DB_PATH, - "db", "store"); + RocksDBSessions rocks = new RocksDBStdSessions(config, "db", "store", + DB_PATH, DB_PATH); rocks.createTable(table); return rocks; } diff --git a/hugegraph-test/src/main/resources/hugegraph.properties b/hugegraph-test/src/main/resources/hugegraph.properties index 9dbc8ecc97..7bf03f16fd 100644 --- a/hugegraph-test/src/main/resources/hugegraph.properties +++ b/hugegraph-test/src/main/resources/hugegraph.properties @@ -22,8 +22,9 @@ cassandra.connect_timeout=30 cassandra.read_timeout=120 # rocksdb backend config -#rocksdb.data_path= -#rocksdb.wal_path= +rocksdb.data_path=rocksdb-data +rocksdb.wal_path=rocksdb-data +rocksdb.data_disks=[graph/secondary_index:rocksdb-index] # hbase backend config hbase.hosts=localhost