Skip to content

Commit

Permalink
fix cross-close the underlying rocksdb session pool (#598)
Browse files Browse the repository at this point in the history
fix: #597
Change-Id: I8b185cd7f81a9a04bc6fd971490ae887fd4ddbb5
  • Loading branch information
javeme authored and zhoney committed Jul 9, 2019
1 parent 011ac85 commit 9a4c543
Show file tree
Hide file tree
Showing 11 changed files with 104 additions and 28 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -465,6 +465,11 @@ public Transaction tx() {

@Override
public void close() throws HugeException {
if (this.closed()) {
return;
}

LOG.info("Close graph {}", this);
this.taskManager.closeScheduler(this);
try {
this.closeTx();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,9 @@ public RocksDBSessions(HugeConfig config, String database, String store) {

public abstract String property(String property);

public abstract RocksDBSessions copy(HugeConfig config,
String database, String store);

@Override
public abstract Session session();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;

import org.rocksdb.BlockBasedTableConfig;
import org.rocksdb.BloomFilter;
Expand Down Expand Up @@ -63,13 +64,14 @@

public class RocksDBStdSessions extends RocksDBSessions {

private final Map<String, ColumnFamilyHandle> cfs = new HashMap<>();

private final RocksDB rocksdb;
private final SstFileManager sstFileManager;

public RocksDBStdSessions(HugeConfig config, String dataPath,
String walPath, String database, String store)
private final Map<String, ColumnFamilyHandle> cfs;
private final AtomicInteger refCount;

public RocksDBStdSessions(HugeConfig config, String database, String store,
String dataPath, String walPath)
throws RocksDBException {
super(config, database, store);

Expand All @@ -86,10 +88,13 @@ public RocksDBStdSessions(HugeConfig config, String dataPath,
* Don't merge old CFs, we expect a clear DB when using this one
*/
this.rocksdb = RocksDB.open(options, dataPath);

this.cfs = new HashMap<>();
this.refCount = new AtomicInteger(1);
}

public RocksDBStdSessions(HugeConfig config, String dataPath,
String walPath, String database, String store,
public RocksDBStdSessions(HugeConfig config, String database, String store,
String dataPath, String walPath,
List<String> cfNames) throws RocksDBException {
super(config, database, store);

Expand Down Expand Up @@ -121,13 +126,28 @@ public RocksDBStdSessions(HugeConfig config, String dataPath,
"Expect same size of cf-handles and cf-names");

// Collect CF Handles
this.cfs = new HashMap<>();
for (int i = 0; i < cfs.size(); i++) {
this.cfs.put(cfs.get(i), cfhs.get(i));
}

this.refCount = new AtomicInteger(1);

ingestExternalFile();
}

private RocksDBStdSessions(HugeConfig config, String database, String store,
RocksDBStdSessions origin) {
super(config, database, store);

this.rocksdb = origin.rocksdb;
this.sstFileManager = origin.sstFileManager;
this.cfs = origin.cfs;
this.refCount = origin.refCount;

this.refCount.incrementAndGet();
}

@Override
public void open() throws Exception {
// pass
Expand Down Expand Up @@ -182,6 +202,12 @@ public String property(String property) {
}
}

@Override
public RocksDBSessions copy(HugeConfig config,
String database, String store) {
return new RocksDBStdSessions(config, database, store, this);
}

@Override
public final Session session() {
return (Session) super.getOrNewSession();
Expand All @@ -198,6 +224,11 @@ protected final Session newSession() {
protected synchronized void doClose() {
this.checkValid();

if (this.refCount.decrementAndGet() > 0) {
return;
}
assert this.refCount.get() == 0;

for (ColumnFamilyHandle cf : this.cfs.values()) {
cf.close();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ public RocksDBStore(final BackendStoreProvider provider,
private void registerMetaHandlers() {
this.registerMetaHandler("metrics", (session, meta, args) -> {
List<RocksDBSessions> dbs = new ArrayList<>();
dbs.add(sessions);
dbs.add(this.sessions);
dbs.addAll(tableDBMapping().values());

RocksDBMetrics metrics = new RocksDBMetrics(dbs, session);
Expand Down Expand Up @@ -157,7 +157,8 @@ public synchronized void open(HugeConfig config) {
// Open tables with optimized disk
Map<String, String> disks = config.getMap(RocksDBOptions.DATA_DISKS);
if (!disks.isEmpty()) {
this.parseTableDiskMapping(disks);
String dataPath = config.get(RocksDBOptions.DATA_PATH);
this.parseTableDiskMapping(disks, dataPath);
for (Entry<HugeType, String> e : this.tableDiskMapping.entrySet()) {
String table = this.table(e.getKey()).table();
String disk = e.getValue();
Expand All @@ -181,10 +182,11 @@ protected RocksDBSessions open(HugeConfig config, String dataPath,
sessions = this.openSessionPool(config, dataPath,
walPath, tableNames);
} catch (RocksDBException e) {
if (dbs.containsKey(dataPath)) {
RocksDBSessions origin = dbs.get(dataPath);
if (origin != null) {
if (e.getMessage().contains("No locks available")) {
// Open twice, but we should support keyspace
sessions = dbs.get(dataPath);
sessions = origin.copy(config, this.database, this.store);
}
}

Expand Down Expand Up @@ -222,6 +224,7 @@ protected RocksDBSessions open(HugeConfig config, String dataPath,
}

if (sessions != null) {
// May override the original session pool
dbs.put(dataPath, sessions);
sessions.session();
LOG.debug("Store opened: {}", dataPath);
Expand All @@ -235,12 +238,11 @@ protected RocksDBSessions openSessionPool(HugeConfig config,
List<String> tableNames)
throws RocksDBException {
if (tableNames == null) {
return new RocksDBStdSessions(config, dataPath, walPath,
this.database, this.store);
return new RocksDBStdSessions(config, this.database, this.store,
dataPath, walPath);
} else {
return new RocksDBStdSessions(config, dataPath, walPath,
this.database, this.store,
tableNames);
return new RocksDBStdSessions(config, this.database, this.store,
dataPath, walPath, tableNames);
}
}

Expand Down Expand Up @@ -363,7 +365,7 @@ public void clear() {

private void dropTable(RocksDBSessions db, String table) {
try {
this.sessions.dropTable(table);
db.dropTable(table);
} catch (BackendException e) {
if (e.getMessage().contains("is not opened")) {
return;
Expand Down Expand Up @@ -446,12 +448,15 @@ private void checkOpened() {
this.database, this.provider.type());
}

private void parseTableDiskMapping(Map<String, String> disks) {
private void parseTableDiskMapping(Map<String, String> disks,
String dataPath) {
this.tableDiskMapping.clear();
for (Map.Entry<String, String> disk : disks.entrySet()) {
// The format of `disk` like: `graph/vertex: /path/to/disk1`
String name = disk.getKey();
String path = disk.getValue();
E.checkArgument(!dataPath.equals(path), "Invalid disk path" +
"(can't be the same as data_path): '%s'", path);
E.checkArgument(!name.isEmpty() && !path.isEmpty(),
"Invalid disk format: '%s', expect `NAME:PATH`",
disk);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,8 @@ public class RocksDBSstSessions extends RocksDBSessions {
private final String dataPath;
private final Map<String, SstFileWriter> tables;

public RocksDBSstSessions(HugeConfig config, String dataPath,
String database, String store) {
public RocksDBSstSessions(HugeConfig config, String database, String store,
String dataPath) {
super(config, database, store);

this.dataPath = dataPath;
Expand All @@ -71,6 +71,14 @@ public RocksDBSstSessions(HugeConfig config, String dataPath,
}
}

private RocksDBSstSessions(HugeConfig config, String database, String store,
RocksDBSstSessions origin) {
super(config, database, store);

this.dataPath = origin.dataPath;
this.tables = origin.tables;
}

@Override
public void open() throws Exception {
// pass
Expand Down Expand Up @@ -110,6 +118,13 @@ public String property(String property) {
throw new NotSupportException("RocksDBSstStore property()");
}

@Override
public RocksDBSessions copy(HugeConfig config,
String database, String store) {
return new RocksDBSstSessions(config, database, store, this);
}


private SstFileWriter table(String table) {
SstFileWriter sst = this.tables.get(table);
if (sst == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,11 +44,11 @@ protected RocksDBSessions openSessionPool(HugeConfig config,
List<String> tableNames)
throws RocksDBException {
if (tableNames == null) {
return new RocksDBSstSessions(config, dataPath, this.database(),
this.store());
return new RocksDBSstSessions(config, this.database(),
this.store(), dataPath);
} else {
return new RocksDBSstSessions(config, dataPath, this.database(),
this.store(), tableNames);
return new RocksDBSstSessions(config, this.database(), this.store(),
dataPath, tableNames);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2190,6 +2190,9 @@ public void testRemoveEdgesOfSuperVertex() {
Assert.assertTrue(e.getMessage().contains(
"Edges size has reached tx capacity"));
});

// Clear all
graph.truncateBackend();
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,8 @@ public void testCreateGraphsWithSameName() {
g1.clearBackend();
g2.clearBackend();
g3.clearBackend();
destoryGraphs(ImmutableList.of(g1));

destoryGraphs(ImmutableList.of(g1, g2, g3));
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -697,6 +697,8 @@ public void testAddVertexWithCustomizeNumberIdStrategy() {
.create();
graph.addVertex(T.label, "programmer", T.id, 123456, "name", "marko",
"age", 18, "city", "Beijing");
graph.addVertex(T.label, "programmer", T.id, 61695499031416832L,
"name", "marko", "age", 18, "city", "Beijing");
graph.tx().commit();

List<Vertex> vertices = graph.traversal().V(123456).toList();
Expand All @@ -707,6 +709,16 @@ public void testAddVertexWithCustomizeNumberIdStrategy() {
assertContains(vertices,
T.label, "programmer", "name", "marko",
"age", 18, "city", "Beijing");

vertices = graph.traversal().V(61695499031416832L).toList();
Assert.assertEquals(1, vertices.size());
id = vertices.get(0).id();
Assert.assertEquals(IdGenerator.LongId.class, id.getClass());
Assert.assertEquals(61695499031416832L,
((IdGenerator.LongId) id).asLong());
assertContains(vertices,
T.label, "programmer", "name", "marko",
"age", 18, "city", "Beijing");
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,8 +111,8 @@ protected static long l(byte[] bytes) {

private static RocksDBSessions open(String table) throws RocksDBException {
HugeConfig config = FakeObjects.newConfig();
RocksDBSessions rocks = new RocksDBStdSessions(config, DB_PATH, DB_PATH,
"db", "store");
RocksDBSessions rocks = new RocksDBStdSessions(config, "db", "store",
DB_PATH, DB_PATH);
rocks.createTable(table);
return rocks;
}
Expand Down
5 changes: 3 additions & 2 deletions hugegraph-test/src/main/resources/hugegraph.properties
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,9 @@ cassandra.connect_timeout=30
cassandra.read_timeout=120

# rocksdb backend config
#rocksdb.data_path=
#rocksdb.wal_path=
rocksdb.data_path=rocksdb-data
rocksdb.wal_path=rocksdb-data
rocksdb.data_disks=[graph/secondary_index:rocksdb-index]

# hbase backend config
hbase.hosts=localhost
Expand Down

0 comments on commit 9a4c543

Please sign in to comment.