Skip to content

Commit

Permalink
fix cross-close the underlying rocksdb session pool
Browse files Browse the repository at this point in the history
fix: #597
Change-Id: I8b185cd7f81a9a04bc6fd971490ae887fd4ddbb5
  • Loading branch information
javeme committed Jul 1, 2019
1 parent b9763c6 commit 29d0632
Show file tree
Hide file tree
Showing 7 changed files with 83 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -465,6 +465,7 @@ public Transaction tx() {

@Override
public void close() throws HugeException {
LOG.info("Close graph {}", this);
this.taskManager.closeScheduler(this);
try {
this.closeTx();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,9 @@ public RocksDBSessions(HugeConfig config, String database, String store) {

public abstract String property(String property);

public abstract RocksDBSessions copy(HugeConfig config,
String database, String store);

@Override
public abstract Session session();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;

import org.rocksdb.ColumnFamilyDescriptor;
import org.rocksdb.ColumnFamilyHandle;
Expand Down Expand Up @@ -62,13 +63,14 @@

public class RocksDBStdSessions extends RocksDBSessions {

private final Map<String, ColumnFamilyHandle> cfs = new HashMap<>();

private final RocksDB rocksdb;
private final SstFileManager sstFileManager;

public RocksDBStdSessions(HugeConfig config, String dataPath,
String walPath, String database, String store)
private final Map<String, ColumnFamilyHandle> cfs;
private final AtomicInteger refCount;

public RocksDBStdSessions(HugeConfig config, String database, String store,
String dataPath,String walPath)
throws RocksDBException {
super(config, database, store);

Expand All @@ -85,10 +87,13 @@ public RocksDBStdSessions(HugeConfig config, String dataPath,
* Don't merge old CFs, we expect a clear DB when using this one
*/
this.rocksdb = RocksDB.open(options, dataPath);

this.cfs = new HashMap<>();
this.refCount = new AtomicInteger(1);
}

public RocksDBStdSessions(HugeConfig config, String dataPath,
String walPath, String database, String store,
public RocksDBStdSessions(HugeConfig config, String database, String store,
String dataPath, String walPath,
List<String> cfNames) throws RocksDBException {
super(config, database, store);

Expand Down Expand Up @@ -120,13 +125,28 @@ public RocksDBStdSessions(HugeConfig config, String dataPath,
"Expect same size of cf-handles and cf-names");

// Collect CF Handles
this.cfs = new HashMap<>();
for (int i = 0; i < cfs.size(); i++) {
this.cfs.put(cfs.get(i), cfhs.get(i));
}

this.refCount = new AtomicInteger(1);

ingestExternalFile();
}

private RocksDBStdSessions(HugeConfig config, String database, String store,
RocksDBStdSessions origin) {
super(config, database, store);

this.rocksdb = origin.rocksdb;
this.sstFileManager = origin.sstFileManager;
this.cfs = origin.cfs;
this.refCount = origin.refCount;

this.refCount.incrementAndGet();
}

@Override
public void open() throws Exception {
// pass
Expand Down Expand Up @@ -181,6 +201,12 @@ public String property(String property) {
}
}

@Override
public RocksDBSessions copy(HugeConfig config,
String database, String store) {
return new RocksDBStdSessions(config, database, store, this);
}

@Override
public final Session session() {
return (Session) super.getOrNewSession();
Expand All @@ -197,6 +223,11 @@ protected final Session newSession() {
protected synchronized void doClose() {
this.checkValid();

if (this.refCount.decrementAndGet() > 0) {
return;
}
assert this.refCount.get() == 0;

for (ColumnFamilyHandle cf : this.cfs.values()) {
cf.close();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ public RocksDBStore(final BackendStoreProvider provider,
private void registerMetaHandlers() {
this.registerMetaHandler("metrics", (session, meta, args) -> {
List<RocksDBSessions> dbs = new ArrayList<>();
dbs.add(sessions);
dbs.add(this.sessions);
dbs.addAll(tableDBMapping().values());

RocksDBMetrics metrics = new RocksDBMetrics(dbs, session);
Expand Down Expand Up @@ -181,10 +181,11 @@ protected RocksDBSessions open(HugeConfig config, String dataPath,
sessions = this.openSessionPool(config, dataPath,
walPath, tableNames);
} catch (RocksDBException e) {
if (dbs.containsKey(dataPath)) {
RocksDBSessions origin = dbs.get(dataPath);
if (origin != null) {
if (e.getMessage().contains("No locks available")) {
// Open twice, but we should support keyspace
sessions = dbs.get(dataPath);
sessions = origin.copy(config, this.database, this.store);
}
}

Expand Down Expand Up @@ -222,6 +223,7 @@ protected RocksDBSessions open(HugeConfig config, String dataPath,
}

if (sessions != null) {
// May override the original session pool
dbs.put(dataPath, sessions);
sessions.session();
LOG.debug("Store opened: {}", dataPath);
Expand All @@ -235,12 +237,11 @@ protected RocksDBSessions openSessionPool(HugeConfig config,
List<String> tableNames)
throws RocksDBException {
if (tableNames == null) {
return new RocksDBStdSessions(config, dataPath, walPath,
this.database, this.store);
return new RocksDBStdSessions(config, this.database, this.store,
dataPath, walPath);
} else {
return new RocksDBStdSessions(config, dataPath, walPath,
this.database, this.store,
tableNames);
return new RocksDBStdSessions(config, this.database, this.store,
dataPath, walPath, tableNames);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,8 @@ public class RocksDBSstSessions extends RocksDBSessions {
private final String dataPath;
private final Map<String, SstFileWriter> tables;

public RocksDBSstSessions(HugeConfig config, String dataPath,
String database, String store) {
public RocksDBSstSessions(HugeConfig config, String database, String store,
String dataPath) {
super(config, database, store);

this.dataPath = dataPath;
Expand All @@ -71,6 +71,14 @@ public RocksDBSstSessions(HugeConfig config, String dataPath,
}
}

private RocksDBSstSessions(HugeConfig config, String database, String store,
RocksDBSstSessions origin) {
super(config, database, store);

this.dataPath = origin.dataPath;
this.tables = origin.tables;
}

@Override
public void open() throws Exception {
// pass
Expand Down Expand Up @@ -110,6 +118,13 @@ public String property(String property) {
throw new NotSupportException("RocksDBSstStore property()");
}

@Override
public RocksDBSessions copy(HugeConfig config,
String database, String store) {
return new RocksDBSstSessions(config, database, store, this);
}


private SstFileWriter table(String table) {
SstFileWriter sst = this.tables.get(table);
if (sst == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,11 +44,11 @@ protected RocksDBSessions openSessionPool(HugeConfig config,
List<String> tableNames)
throws RocksDBException {
if (tableNames == null) {
return new RocksDBSstSessions(config, dataPath, this.database(),
this.store());
return new RocksDBSstSessions(config, this.database(),
this.store(), dataPath);
} else {
return new RocksDBSstSessions(config, dataPath, this.database(),
this.store(), tableNames);
return new RocksDBSstSessions(config, this.database(), this.store(),
dataPath, tableNames);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -697,6 +697,8 @@ public void testAddVertexWithCustomizeNumberIdStrategy() {
.create();
graph.addVertex(T.label, "programmer", T.id, 123456, "name", "marko",
"age", 18, "city", "Beijing");
graph.addVertex(T.label, "programmer", T.id, 61695499031416832L,
"name", "marko", "age", 18, "city", "Beijing");
graph.tx().commit();

List<Vertex> vertices = graph.traversal().V(123456).toList();
Expand All @@ -707,6 +709,16 @@ public void testAddVertexWithCustomizeNumberIdStrategy() {
assertContains(vertices,
T.label, "programmer", "name", "marko",
"age", 18, "city", "Beijing");

vertices = graph.traversal().V(61695499031416832L).toList();
Assert.assertEquals(1, vertices.size());
id = vertices.get(0).id();
Assert.assertEquals(IdGenerator.LongId.class, id.getClass());
Assert.assertEquals(61695499031416832L,
((IdGenerator.LongId) id).asLong());
assertContains(vertices,
T.label, "programmer", "name", "marko",
"age", 18, "city", "Beijing");
}

@Test
Expand Down

0 comments on commit 29d0632

Please sign in to comment.