Skip to content

Commit

Permalink
DataSourceArray can no longer be accessed outside the db module:
Browse files Browse the repository at this point in the history
 - Knowledge of the sizeKey is removed from external tests.
 - Internal module tests check for correct storage of the size key in
 exceptional error cases that may cause it to miss from the database.
 - Removed the use of Flushable interface which will be removed once
 all implementations are updated.
  • Loading branch information
AlexandraRoatis committed Sep 5, 2019
1 parent 4a36af3 commit aace86d
Show file tree
Hide file tree
Showing 8 changed files with 250 additions and 145 deletions.
13 changes: 7 additions & 6 deletions modAionImpl/src/org/aion/zero/impl/db/AionBlockStore.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,12 @@
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import org.aion.db.impl.ByteArrayKeyValueDatabase;
import org.aion.db.store.ArrayStore;
import org.aion.db.store.DataSource;
import org.aion.db.store.DataSource.Type;
import org.aion.db.store.DataSourceArray;
import org.aion.db.store.ObjectDataSource;
import org.aion.db.store.Serializer;
import org.aion.db.store.Stores;
import org.aion.log.AionLoggerFactory;
import org.aion.log.LogEnum;
import org.aion.mcf.blockchain.Block;
Expand All @@ -50,7 +51,7 @@ public class AionBlockStore implements IBlockStorePow {

protected Lock lock = new ReentrantLock();

private DataSourceArray<List<BlockInfo>> index;
private ArrayStore<List<BlockInfo>> index;
private ObjectDataSource<Block> blocks;

private boolean checkIntegrity = true;
Expand All @@ -65,7 +66,7 @@ public AionBlockStore(ByteArrayKeyValueDatabase index, ByteArrayKeyValueDatabase
}

public AionBlockStore(ByteArrayKeyValueDatabase index, ByteArrayKeyValueDatabase blocks, boolean checkIntegrity, int blockCacheSize) {
this.index = new DataSourceArray<>(new ObjectDataSource(index, BLOCK_INFO_SERIALIZER));
this.index = Stores.newArrayStore(index, BLOCK_INFO_SERIALIZER);
// Note: because of cache use the blocks db should write lock on get as well
this.blocks =
new DataSource<>(blocks, BLOCK_SERIALIZER)
Expand Down Expand Up @@ -156,7 +157,7 @@ public void flush() {
lock.lock();
try {
blocks.flush();
index.flush();
index.commit();
} finally {
lock.unlock();
}
Expand Down Expand Up @@ -1226,15 +1227,15 @@ public boolean isMainChain(byte[] hash, long level) {
*/
public void correctSize(long maxNumber, Logger log) {
// correcting the size if smaller than should be
long storedSize = index.getStoredSize();
long storedSize = index.size();
if (maxNumber >= storedSize) {
// can't change size directly, so we do a put + delete the next level to reset it
index.set(maxNumber + 1, new ArrayList<>());
index.remove(maxNumber + 1);
log.info(
"Corrupted index size corrected from {} to {}.",
storedSize,
index.getStoredSize());
index.size());
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
import org.aion.zero.impl.core.ImportResult;
import org.aion.zero.impl.trie.TrieImpl;
import org.aion.util.bytes.ByteUtil;
import org.aion.util.conversions.Hex;
import org.aion.vm.avm.LongLivedAvm;
import org.aion.zero.impl.types.BlockContext;
import org.aion.zero.impl.db.AionRepositoryImpl;
Expand Down Expand Up @@ -527,10 +526,6 @@ public void testRecoverIndexWithPartialIndex_MainChain() {
assertThat(deletedInfo.get(level)).isEqualTo(indexDatabase.get(indexKey).get());
}

// ensure the size key is correct
byte[] sizeKey = Hex.decode("FFFFFFFFFFFFFFFF");
assertThat(indexDatabase.get(sizeKey).isPresent()).isTrue();

// 2: recovery at import

repo.flush();
Expand Down Expand Up @@ -565,9 +560,6 @@ public void testRecoverIndexWithPartialIndex_MainChain() {
byte[] indexKey = ByteUtil.intToBytes(key.intValue());
assertThat(deletedInfo.get(key)).isEqualTo(indexDatabase.get(indexKey).get());
}

// ensure the size key is correct
assertThat(indexDatabase.get(sizeKey).isPresent()).isTrue();
}

/** Test the recovery of the index with start from the index of an ancestor block. */
Expand Down Expand Up @@ -694,10 +686,6 @@ public void testRecoverIndexWithPartialIndex_ShorterSideChain() {
assertThat(deletedInfo.get(level)).isEqualTo(indexDatabase.get(indexKey).get());
}

// ensure the size key is correct
byte[] sizeKey = Hex.decode("FFFFFFFFFFFFFFFF");
assertThat(indexDatabase.get(sizeKey).isPresent()).isTrue();

// 2: recovery at import

repo.flush();
Expand Down Expand Up @@ -745,9 +733,6 @@ public void testRecoverIndexWithPartialIndex_ShorterSideChain() {
// NOTE: this checks the correction of both main chain and side chain recovery
assertThat(deletedInfo.get(key)).isEqualTo(indexDatabase.get(indexKey).get());
}

// ensure the size key is correct
assertThat(indexDatabase.get(sizeKey).isPresent()).isTrue();
}

/** Test the index recovery when the index database contains only the size and genesis index. */
Expand Down Expand Up @@ -819,10 +804,6 @@ public void testRecoverIndexWithStartFromGenesis() {
assertThat(deletedInfo.get(level)).isEqualTo(indexDatabase.get(indexKey).get());
}

// ensure the size key is correct
byte[] sizeKey = Hex.decode("FFFFFFFFFFFFFFFF");
assertThat(indexDatabase.get(sizeKey).isPresent()).isTrue();

// 2: recovery at import

repo.flush();
Expand Down Expand Up @@ -857,9 +838,6 @@ public void testRecoverIndexWithStartFromGenesis() {
byte[] indexKey = ByteUtil.intToBytes(key.intValue());
assertThat(deletedInfo.get(key)).isEqualTo(indexDatabase.get(indexKey).get());
}

// ensure the size key is correct
assertThat(indexDatabase.get(sizeKey).isPresent()).isTrue();
}

/**
Expand Down Expand Up @@ -976,10 +954,6 @@ public void testRecoverIndexWithStartFromGenesisWithoutSize() {
repo.flush();
Map<Long, byte[]> deletedInfo = new HashMap<>();

byte[] sizeKey = Hex.decode("FFFFFFFFFFFFFFFF");
deletedInfo.put(-1L, indexDatabase.get(sizeKey).get());
indexDatabase.delete(sizeKey);

for (Map.Entry<Long, byte[]> entry : blocksToDelete.entrySet()) {
byte[] indexKey = ByteUtil.intToBytes(entry.getKey().intValue());
// saving the data for checking recovery
Expand All @@ -1002,11 +976,6 @@ public void testRecoverIndexWithStartFromGenesisWithoutSize() {
assertThat(worked).isTrue();
assertThat(repo.isIndexed(bestBlock.getHash(), bestBlock.getNumber())).isTrue();

// ensure the size key was recovered
assertThat(indexDatabase.get(sizeKey).isPresent()).isTrue();
assertThat(indexDatabase.get(sizeKey).get()).isEqualTo(deletedInfo.get(-1L));
deletedInfo.remove(-1L);

// check that the index information is correct
for (Map.Entry<Long, byte[]> entry : blocksToDelete.entrySet()) {
long level = entry.getKey();
Expand All @@ -1021,8 +990,6 @@ public void testRecoverIndexWithStartFromGenesisWithoutSize() {
// 2: recovery at import

repo.flush();
deletedInfo.put(-1L, indexDatabase.get(sizeKey).get());
indexDatabase.delete(sizeKey);

for (Map.Entry<Long, byte[]> entry : blocksToDelete.entrySet()) {
byte[] indexKey = ByteUtil.intToBytes(entry.getKey().intValue());
Expand All @@ -1049,11 +1016,6 @@ public void testRecoverIndexWithStartFromGenesisWithoutSize() {
// ensure that the index was recovered
assertThat(repo.isIndexed(bestBlock.getHash(), bestBlock.getNumber())).isTrue();

// ensure the size key was recovered
assertThat(indexDatabase.get(sizeKey).isPresent()).isTrue();
assertThat(indexDatabase.get(sizeKey).get()).isEqualTo(deletedInfo.get(-1L));
deletedInfo.remove(-1L);

// check that the index information is correct at database level
for (Long key : blocksToDelete.keySet()) {
// checking at database level
Expand Down Expand Up @@ -1165,10 +1127,6 @@ public void testRecoverIndex_wDeletedBlock() {
// ensure that the index was recovered
assertThat(repo.isIndexed(bestBlock.getHash(), bestBlock.getNumber())).isFalse();

// ensure the size key is correct
byte[] sizeKey = Hex.decode("FFFFFFFFFFFFFFFF");
assertThat(indexDatabase.get(sizeKey).isPresent()).isTrue();

// 2: recovery at import

repo.flush();
Expand Down Expand Up @@ -1228,8 +1186,5 @@ public void testRecoverIndex_wDeletedBlock() {
byte[] indexKey = ByteUtil.intToBytes(key.intValue());
assertThat(deletedInfo.get(key)).isEqualTo(indexDatabase.get(indexKey).get());
}

// ensure the size key is correct
assertThat(indexDatabase.get(sizeKey).isPresent()).isTrue();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,15 @@
import java.util.List;
import java.util.Map;
import org.aion.db.impl.ByteArrayKeyValueDatabase;
import org.aion.db.store.ArrayStore;
import org.aion.db.store.Stores;
import org.aion.log.AionLoggerFactory;
import org.aion.mcf.blockchain.Block;
import org.aion.util.bytes.ByteUtil;
import org.aion.zero.impl.core.ImportResult;
import org.aion.zero.impl.db.AionBlockStore;
import org.aion.zero.impl.db.AionBlockStore.BlockInfo;
import org.aion.zero.impl.db.AionRepositoryImpl;
import org.aion.db.store.DataSourceArray;
import org.aion.db.store.ObjectDataSource;
import org.junit.BeforeClass;
import org.junit.Test;

Expand Down Expand Up @@ -151,16 +152,15 @@ public void testIndexIntegrityWithRecovery() {
ByteArrayKeyValueDatabase indexDatabase = repo.getIndexDatabase();

// corrupting the index at level 2
DataSourceArray<List<AionBlockStore.BlockInfo>> index =
new DataSourceArray<>(new ObjectDataSource<>(indexDatabase, BLOCK_INFO_SERIALIZER));
List<AionBlockStore.BlockInfo> infos = index.get(2);
ArrayStore<List<BlockInfo>> index = Stores.newArrayStore(indexDatabase, BLOCK_INFO_SERIALIZER);
List<BlockInfo> infos = index.get(2);
assertThat(infos.size()).isEqualTo(2);

for (AionBlockStore.BlockInfo bi : infos) {
bi.setCummDifficulty(bi.getCummDifficulty().add(BigInteger.TEN));
}
index.set(2, infos);
index.flush();
index.commit();

AionBlockStore blockStore = repo.getBlockStore();

Expand Down
33 changes: 33 additions & 0 deletions modDbImpl/src/org/aion/db/store/ArrayStore.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
package org.aion.db.store;

import java.io.Closeable;

/**
* A key value store that interacts with objects that are serialized to byte arrays and deserialized
* back into objects using a specified {@link Serializer} implementation. The stored data is
* interpreted as an array allowing access according to the defined {@code long} index values. The
* elements of the array are indexed from 0 to highest stored index value.
*
* @param <V> the class of objects used by a specific implementation
* @author Alexandra Roatis
*/
public interface ArrayStore<V> extends Closeable {

/** Inserts an object at the specified index. */
void set(long index, V value);

/** Removes the object at the given index. */
void remove(long index);

/** Pushes changes to the underlying database. */
void commit();

/** Retrieves the object stored at the given index. */
V get(long index);

/** Retrieves the size of the array defined as the highest stored entry plus one. */
long size();

/** Returns {@code true} to indicate that the database is open, {@code false} otherwise. */
boolean isOpen();
}
6 changes: 1 addition & 5 deletions modDbImpl/src/org/aion/db/store/DataSource.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ public enum Type {
}

/**
* Required data for {@link ObjectDataSource} and {@link DataSourceArray}.
* Required data for {@link ObjectDataSource}.
*
* @param src the source database
* @param serializer the serializer used to convert data to byte arrays and vice versa
Expand Down Expand Up @@ -92,8 +92,4 @@ public ObjectDataSource<V> buildObjectSource() {
// in case the given cache size is equal to zero
return new ObjectDataSource<>(src, serializer);
}

public DataSourceArray<V> buildArraySource() {
return new DataSourceArray<>(this.buildObjectSource());
}
}
41 changes: 28 additions & 13 deletions modDbImpl/src/org/aion/db/store/DataSourceArray.java
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
package org.aion.db.store;

import java.io.Closeable;
import com.google.common.annotations.VisibleForTesting;
import java.util.Optional;
import org.aion.db.Flushable;
import org.aion.db.impl.ByteArrayKeyValueDatabase;
import org.aion.util.bytes.ByteUtil;
import org.aion.util.conversions.Hex;

Expand All @@ -11,22 +11,26 @@
*
* @param <V>
*/
public class DataSourceArray<V> implements Flushable, Closeable {
class DataSourceArray<V> implements ArrayStore<V> {

private final ObjectDataSource<V> src;
private static final byte[] sizeKey = Hex.decode("FFFFFFFFFFFFFFFF");
private final ByteArrayKeyValueDatabase db;
@VisibleForTesting
static final byte[] sizeKey = Hex.decode("FFFFFFFFFFFFFFFF");
private long size = -1L;

public DataSourceArray(ObjectDataSource<V> src) {
this.src = src;
DataSourceArray(ByteArrayKeyValueDatabase database, Serializer<V> serializer) {
this.db = database;
this.src = new DataSource<>(db, serializer).buildObjectSource();
}

@Override
public void flush() {
public void commit() {
src.flush();
}

public V set(long index, V value) {
@Override
public void set(long index, V value) {
if (index <= Integer.MAX_VALUE) {
src.put(ByteUtil.intToBytes((int) index), value);
} else {
Expand All @@ -35,9 +39,9 @@ public V set(long index, V value) {
if (index >= size()) {
setSize(index + 1);
}
return value;
}

@Override
public void remove(long index) {
// without this check it will remove the sizeKey
if (index < 0 || index >= size()) {
Expand All @@ -54,6 +58,7 @@ public void remove(long index) {
}
}

@Override
public V get(long index) {
if (index < 0 || index >= size()) {
throw new IndexOutOfBoundsException(
Expand All @@ -79,14 +84,14 @@ public long getStoredSize() {

// Read the value from the database directly and
// convert to the size, and if it doesn't exist, 0.
Optional<byte[]> optBytes = src.getSrc().get(sizeKey);
Optional<byte[]> optBytes = db.get(sizeKey);
if (!optBytes.isPresent()) {
size = 0L;
} else {
byte[] bytes = optBytes.get();

if (bytes.length == 4) {
size = (long) ByteUtil.byteArrayToInt(bytes);
size = ByteUtil.byteArrayToInt(bytes);
} else {
size = ByteUtil.byteArrayToLong(bytes);
}
Expand All @@ -95,6 +100,7 @@ public long getStoredSize() {
return size;
}

@Override
public long size() {

if (size < 0) {
Expand All @@ -107,14 +113,23 @@ public long size() {
private synchronized void setSize(long newSize) {
size = newSize;
if (size <= Integer.MAX_VALUE) {
src.getSrc().put(sizeKey, ByteUtil.intToBytes((int) newSize));
db.put(sizeKey, ByteUtil.intToBytes((int) newSize));
} else {
src.getSrc().put(sizeKey, ByteUtil.longToBytes(newSize));
db.put(sizeKey, ByteUtil.longToBytes(newSize));
}
}

@Override
public boolean isOpen() {
return src.isOpen();
}

@Override
public void close() {
// ensures that the size is written to disk if it was previously missing
if (!db.get(sizeKey).isPresent()) {
setSize(size);
}
src.close();
}
}
Loading

0 comments on commit aace86d

Please sign in to comment.