Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

AKI-695: Use batch database operations in production code #1142

Merged
merged 9 commits into from
Apr 15, 2020
5 changes: 3 additions & 2 deletions modAionImpl/src/org/aion/zero/impl/db/AionRepositoryImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -1213,7 +1213,7 @@ public List<byte[]> getReferencedStorageNodes(byte[] value, int limit, AionAddre
byte[] subKey = h256(("details-storage/" + contract.toString()).getBytes());

ByteArrayKeyValueStore db =
new XorDataSource(selectDatabase(DatabaseType.STORAGE), subKey);
new XorDataSource(selectDatabase(DatabaseType.STORAGE), subKey, false);

Trie trie = new SecureTrie(db);
Map<ByteArrayWrapper, byte[]> refs = trie.getReferencedTrieNodes(value, limit);
Expand Down Expand Up @@ -1273,7 +1273,8 @@ public TrieNodeResult importTrieNode(byte[] key, byte[] value, DatabaseType dbTy
}
}

db.put(key, value);
db.putToBatch(key, value);
db.commit();
return TrieNodeResult.IMPORTED;
}

Expand Down
7 changes: 4 additions & 3 deletions modAionImpl/src/org/aion/zero/impl/db/AvmContractDetails.java
Original file line number Diff line number Diff line change
Expand Up @@ -356,10 +356,11 @@ public byte[] getEncoded() {
public void syncStorage() {
byte[] graph = getObjectGraph();
if (!Arrays.equals(graph, EMPTY_BYTE_ARRAY)) {
objectGraphSource.put(objectGraphHash, graph);
objectGraphSource.putToBatch(objectGraphHash, graph);
}
objectGraphSource.put(computeAvmStorageHash(), RLP.encodeList(RLP.encodeElement(storageTrie.getRootHash()), RLP.encodeElement(objectGraphHash)));

objectGraphSource.putToBatch(computeAvmStorageHash(), RLP.encodeList(RLP.encodeElement(storageTrie.getRootHash()), RLP.encodeElement(objectGraphHash)));
// commit both updates together
objectGraphSource.commit();
storageTrie.sync();
}

Expand Down
13 changes: 5 additions & 8 deletions modAionImpl/src/org/aion/zero/impl/db/DetailsDataStore.java
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,8 @@ public DetailsDataStore(
* @return the external storage data source associated with the given contract address
*/
private ByteArrayKeyValueStore createStorageSource(AionAddress address) {
return new XorDataSource(storageDSPrune, h256(("details-storage/" + address.toString()).getBytes()));
// NOTE: The consensus-correct Trie use for contracts requires not pushing deletions via the XorDataSource.
return new XorDataSource(storageDSPrune, h256(("details-storage/" + address.toString()).getBytes()), false);
}

/**
Expand All @@ -59,7 +60,7 @@ private ByteArrayKeyValueStore createStorageSource(AionAddress address) {
* @return the object graph data source associated with the given contract address
*/
private ByteArrayKeyValueStore createGraphSource(AionAddress address) {
return new XorDataSource(graphSrc, h256(("details-graph/" + address.toString()).getBytes()));
return new XorDataSource(graphSrc, h256(("details-graph/" + address.toString()).getBytes()), true);
}

/**
Expand Down Expand Up @@ -113,15 +114,11 @@ public StoredContractDetails newContractDetails(AionAddress address, InternalVmT
public synchronized void update(AionAddress key, StoredContractDetails contractDetails) {
// Put into cache.
byte[] rawDetails = contractDetails.getEncoded();
detailsSrc.put(key.toByteArray(), rawDetails);

detailsSrc.putToBatch(key.toByteArray(), rawDetails);
detailsSrc.commit(); // TODO AKI-309: flush in bulk by the repository
contractDetails.syncStorage();
}

public synchronized void remove(byte[] key) {
detailsSrc.delete(key);
}

public JournalPruneDataSource getStorageDSPrune() {
return storageDSPrune;
}
Expand Down
4 changes: 2 additions & 2 deletions modAionImpl/src/org/aion/zero/impl/db/PendingBlockStore.java
Original file line number Diff line number Diff line change
Expand Up @@ -276,7 +276,7 @@ public int addBlockRange(List<Block> blocks, Logger log) {
int stored = addBlockRange(first, blockRange);

// save data to disk
indexSource.commitBatch();
indexSource.commit();
levelSource.commit();
queueSource.commit();

Expand Down Expand Up @@ -543,7 +543,7 @@ public void dropPendingQueues(long level, Collection<ByteArrayWrapper> queues, M
}

// push changed to disk
indexSource.commitBatch();
indexSource.commit();
queueSource.commit();
levelSource.commit();

Expand Down
9 changes: 0 additions & 9 deletions modAionImpl/src/org/aion/zero/impl/trie/Cache.java
Original file line number Diff line number Diff line change
Expand Up @@ -84,15 +84,6 @@ public synchronized Value get(byte[] key) {
return null;
}

public synchronized void delete(byte[] key) {
ByteArrayWrapper wrappedKey = wrap(key);
this.nodes.remove(wrappedKey);

if (dataSource != null) {
this.dataSource.delete(key);
}
}

public synchronized void commit(boolean flushCache) {
// Don't try to commit if it isn't dirty
if ((dataSource == null) || !this.isDirty) {
Expand Down
2 changes: 2 additions & 0 deletions modAionImpl/src/org/aion/zero/impl/trie/TrieImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -1005,6 +1005,7 @@ private void appendHashes(byte[] bytes, ArrayList<byte[]> hashes) {
public long saveFullStateToDatabase(byte[] stateRoot, ByteArrayKeyValueDatabase db) {
ExtractToDatabase traceAction = new ExtractToDatabase(db);
traceTrie(stateRoot, traceAction);
db.commit();
return traceAction.count;
}

Expand All @@ -1024,6 +1025,7 @@ private void traceDiffTrie(byte[] stateRoot, ScanAction action, ByteArrayKeyValu
public long saveDiffStateToDatabase(byte[] stateRoot, ByteArrayKeyValueDatabase db) {
ExtractToDatabase traceAction = new ExtractToDatabase(db);
traceDiffTrie(stateRoot, traceAction, db);
db.commit();
return traceAction.count;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ public ExtractToDatabase(ByteArrayKeyValueDatabase _db) {

@Override
public void doOnNode(byte[] hash, Value node) {
db.put(hash, dummy_value);
db.putToBatch(hash, dummy_value);
count++;
}
}
19 changes: 2 additions & 17 deletions modDbImpl/src/org/aion/db/generic/LockedDatabase.java
Original file line number Diff line number Diff line change
Expand Up @@ -67,21 +67,6 @@ public void close() {
}
}

@Override
public boolean commit() {
// acquire write lock
lock.writeLock().lock();

try {
return database.commit();
} catch (Exception e) {
throw e;
} finally {
// releasing write lock
lock.writeLock().unlock();
}
}

@Override
public void compact() {
// acquire write lock
Expand Down Expand Up @@ -316,12 +301,12 @@ public void deleteInBatch(byte[] key) {
}

@Override
public void commitBatch() {
public void commit() {
// acquire write lock
lock.writeLock().lock();

try {
database.commitBatch();
database.commit();
} catch (Exception e) {
if (e instanceof RuntimeException) {
throw e;
Expand Down
16 changes: 3 additions & 13 deletions modDbImpl/src/org/aion/db/generic/TimedDatabase.java
Original file line number Diff line number Diff line change
Expand Up @@ -52,16 +52,6 @@ public void close() {
LOG.debug(database.toString() + " close() in " + (t2 - t1) + " ns.");
}

@Override
public boolean commit() {
long t1 = System.nanoTime();
boolean cmt = database.commit();
long t2 = System.nanoTime();

LOG.debug(database.toString() + " commit() in " + (t2 - t1) + " ns.");
return cmt;
}

@Override
public void compact() {
long t1 = System.nanoTime();
Expand Down Expand Up @@ -251,12 +241,12 @@ public void deleteInBatch(byte[] key) {
}

@Override
public void commitBatch() {
public void commit() {
long t1 = System.nanoTime();
database.commitBatch();
database.commit();
long t2 = System.nanoTime();

LOG.debug(database.toString() + " commitBatch() in " + (t2 - t1) + " ns.");
LOG.debug(database.toString() + " commit() in " + (t2 - t1) + " ns.");
}

@Override
Expand Down
11 changes: 0 additions & 11 deletions modDbImpl/src/org/aion/db/impl/AbstractDB.java
Original file line number Diff line number Diff line change
Expand Up @@ -57,14 +57,6 @@ protected String propertiesInfo() {
+ ">"; //
}

@Override
public boolean commit() {
// not implemented since we always commit the changes to the database for this
// implementation
throw new UnsupportedOperationException(
"Only automatic commits are supported by " + this.toString());
}

@Override
public void compact() {
LOG.warn("Compact not supported by " + this.toString() + ".");
Expand Down Expand Up @@ -145,9 +137,6 @@ public boolean isLocked() {
return false;
}

/** Functionality for directly interacting with the heap cache. */
public abstract boolean commitCache(Map<ByteArrayWrapper, byte[]> cache);

@Override
public Optional<byte[]> get(byte[] key) {
check(key);
Expand Down
12 changes: 0 additions & 12 deletions modDbImpl/src/org/aion/db/impl/Database.java
Original file line number Diff line number Diff line change
Expand Up @@ -44,18 +44,6 @@ public interface Database {
*/
void close();

/**
* Makes all changes made since the previous commit/rollback permanent and releases any database
* locks currently held by this Connection object. This method should be used only when
* auto-commit mode has been disabled.
*
* @return {@code true} if the changes were successfully committed to storage, {@code false} if
* the changes could not be committed to storage
* @throws RuntimeException if the data store is closed
* @implNote Returns {@code true} with no other effect when auto-commit is already enabled.
*/
boolean commit();

/** Reduce the size of the database when possible. */
void compact();

Expand Down
5 changes: 4 additions & 1 deletion modDbImpl/src/org/aion/db/impl/KeyValueStore.java
Original file line number Diff line number Diff line change
Expand Up @@ -119,8 +119,11 @@ public interface KeyValueStore<KeyT, ValueT> extends AutoCloseable {
/**
* Pushes updates made using {@link #putToBatch(Object, Object)} and {@link
* #deleteInBatch(Object)} to the underlying data source.
* All changes since the last commit will be made permanent.
*
* @throws RuntimeException if the underlying data store is closed
*/
void commitBatch();
void commit();

/**
* Puts or updates the data store with the given <i>key-value</i> pairs, as follows:
Expand Down
30 changes: 3 additions & 27 deletions modDbImpl/src/org/aion/db/impl/h2/H2MVMap.java
Original file line number Diff line number Diff line change
Expand Up @@ -246,7 +246,9 @@ public void deleteInBatchInternal(byte[] key) {
}

@Override
public void commitBatch() {
public void commit() {
check();

// nothing to do since batch operations are not supported
}

Expand Down Expand Up @@ -296,32 +298,6 @@ public void deleteBatchInternal(Collection<byte[]> keys) {
}
}

// AbstractDB functionality
// ----------------------------------------------------------------------------------------

public boolean commitCache(Map<ByteArrayWrapper, byte[]> cache) {
boolean success = false;

try {
check();

// doesn't actually have functionality for batch operations
for (Entry<ByteArrayWrapper, byte[]> e : cache.entrySet()) {
if (e.getValue() == null) {
map.remove(e.getKey().toBytes());
} else {
map.put(e.getKey().toBytes(), e.getValue());
}
}

success = true;
} catch (Exception e) {
LOG.error("Unable to commit heap cache to " + this.toString() + ".", e);
}

return success;
}

/**
* Compact the database file, that is, compact blocks that have a low fill rate, and move chunks
* next to each other. This will typically shrink the database file. Changes are flushed to the
Expand Down
36 changes: 3 additions & 33 deletions modDbImpl/src/org/aion/db/impl/leveldb/LevelDB.java
Original file line number Diff line number Diff line change
Expand Up @@ -368,7 +368,9 @@ public void deleteInBatchInternal(byte[] key) {
}

@Override
public void commitBatch() {
public void commit() {
check();

if (batch != null) {
try {
db.write(batch);
Expand Down Expand Up @@ -426,36 +428,4 @@ public void deleteBatchInternal(Collection<byte[]> keys) {
LOG.error("Unable to close WriteBatch object in " + this.toString() + ".", e);
}
}

// AbstractDB functionality
// ----------------------------------------------------------------------------------------

public boolean commitCache(Map<ByteArrayWrapper, byte[]> cache) {
boolean success = false;

check();

// try-with-resources will automatically close the batch object
try (WriteBatch batch = db.createWriteBatch()) {
// add put and delete operations to batch
for (Map.Entry<ByteArrayWrapper, byte[]> e : cache.entrySet()) {
if (e.getValue() == null) {
batch.delete(e.getKey().toBytes());
} else {
batch.put(e.getKey().toBytes(), e.getValue());
}
}

// bulk atomic update
db.write(batch);

success = true;
} catch (DBException e) {
LOG.error("Unable to commit heap cache to " + this.toString() + ".", e);
} catch (IOException e) {
LOG.error("Unable to close WriteBatch object in " + this.toString() + ".", e);
}

return success;
}
}
28 changes: 3 additions & 25 deletions modDbImpl/src/org/aion/db/impl/mockdb/MockDB.java
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,9 @@ public void deleteInBatchInternal(byte[] key) {
}

@Override
public void commitBatch() {
public void commit() {
check();

// nothing to do since batch operations are not supported
}

Expand Down Expand Up @@ -155,28 +157,4 @@ public void deleteBatchInternal(Collection<byte[]> keys) {
public void drop() {
kv.clear();
}

public boolean commitCache(Map<ByteArrayWrapper, byte[]> cache) {
boolean success = false;

try {
check();

// simply do a put, because setting a kv pair to null is same as delete
cache.forEach(
(key, value) -> {
if (value == null) {
kv.remove(key);
} else {
kv.put(key, value);
}
});

success = true;
} catch (Exception e) {
LOG.error("Unable to commit heap cache to " + this.toString() + ".", e);
}

return success;
}
}
Loading