Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

AKI 317: TransactionStore enhancements #1017

Merged
merged 3 commits into from
Oct 21, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -513,7 +513,7 @@ public List<Block> getBlocksByRange(long first, long last) {
public AionTxInfo getTransactionInfo(byte[] hash) {

// Try to get info if the hash is from an invokable transaction
List<AionTxInfo> infos = getTransactionInfoByAlias(hash);
Map<ByteArrayWrapper, AionTxInfo> infos = getTransactionInfoByAlias(hash);

// If we didn't find the alias for an invokable
if (infos == null || infos.isEmpty()) {
Expand All @@ -525,21 +525,13 @@ public AionTxInfo getTransactionInfo(byte[] hash) {
}

AionTxInfo txInfo = null;
if (infos.size() == 1) {
txInfo = infos.get(0);
} else {
// pick up the receipt from the block on the main chain
for (AionTxInfo info : infos) {
Block block = getBlockStore().getBlockByHash(info.getBlockHash());
if (block == null) continue;

Block mainBlock = getBlockStore().getChainBlockByNumber(block.getNumber());
if (mainBlock == null) continue;

if (Arrays.equals(info.getBlockHash(), mainBlock.getHash())) {
txInfo = info;
break;
}
// pick up the receipt from the block on the main chain
for (ByteArrayWrapper blockHash : infos.keySet()) {
if (!isMainChain(blockHash.toBytes())) {
continue;
} else {
txInfo = infos.get(blockHash);
break;
}
}
if (txInfo == null) {
Expand All @@ -561,16 +553,16 @@ public AionTxInfo getTransactionInfoLite(byte[] txHash, byte[] blockHash) {
return transactionStore.getTxInfo(txHash, blockHash);
}

private List<AionTxInfo> getTransactionInfoByAlias(byte[] innerHash) {
private Map<ByteArrayWrapper, AionTxInfo> getTransactionInfoByAlias(byte[] innerHash) {
Set<ByteArrayWrapper> metaTxHashes = transactionStore.getAliases(innerHash);

if (metaTxHashes == null) return null; // No aliases found

List<AionTxInfo> infoList = new ArrayList<>();
Map<ByteArrayWrapper, AionTxInfo> infoList = new HashMap<>();
for (ByteArrayWrapper metaTxHash : metaTxHashes) {
List<AionTxInfo> metaTxInfos = transactionStore.getTxInfo(metaTxHash.toBytes());
Map<ByteArrayWrapper, AionTxInfo> metaTxInfos = transactionStore.getTxInfo(metaTxHash.toBytes());
if (metaTxInfos != null) {
infoList.addAll(metaTxInfos);
infoList.putAll(metaTxInfos);
}
}
return infoList; // Had metaTx hash, but was not found in mainchain
Expand Down Expand Up @@ -1462,12 +1454,12 @@ public Pair<AionBlockSummary, RepositoryCache> add(
List<AionTxExecSummary> execSummaries = summary.getSummaries();

for (int i = 0; i < receipts.size(); i++) {
AionTxInfo infoWithInternalTxs = AionTxInfo.newInstanceWithInternalTransactions(receipts.get(i), block.getHash(), i, execSummaries.get(i).getInternalTransactions());
AionTxInfo infoWithInternalTxs = AionTxInfo.newInstanceWithInternalTransactions(receipts.get(i), block.getHashWrapper(), i, execSummaries.get(i).getInternalTransactions());

if (storeInternalTransactions) {
transactionStore.putTxInfoToBatch(infoWithInternalTxs);
} else {
AionTxInfo info = AionTxInfo.newInstance(receipts.get(i), block.getHash(), i);
AionTxInfo info = AionTxInfo.newInstance(receipts.get(i), block.getHashWrapper(), i);
transactionStore.putTxInfoToBatch(info);
}

Expand Down Expand Up @@ -1843,12 +1835,12 @@ private void storeBlock(Block block, List<AionTxReceipt> receipts, List<AionTxEx
getBlockStore().saveBlock(block, td, !fork);

for (int i = 0; i < receipts.size(); i++) {
AionTxInfo infoWithInternalTxs = AionTxInfo.newInstanceWithInternalTransactions(receipts.get(i), block.getHash(), i, summaries.get(i).getInternalTransactions());
AionTxInfo infoWithInternalTxs = AionTxInfo.newInstanceWithInternalTransactions(receipts.get(i), block.getHashWrapper(), i, summaries.get(i).getInternalTransactions());

if (storeInternalTransactions) {
transactionStore.putTxInfoToBatch(infoWithInternalTxs);
} else {
AionTxInfo info = AionTxInfo.newInstance(receipts.get(i), block.getHash(), i);
AionTxInfo info = AionTxInfo.newInstance(receipts.get(i), block.getHashWrapper(), i);
transactionStore.putTxInfoToBatch(info);
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,37 +1,41 @@
package org.aion.zero.impl.db;

import java.util.ArrayList;
import java.util.List;
import java.util.HashMap;
import java.util.Map;
import org.aion.db.store.Serializer;
import org.aion.log.AionLoggerFactory;
import org.aion.log.LogEnum;
import org.aion.rlp.RLP;
import org.aion.rlp.RLPList;
import org.aion.util.types.ByteArrayWrapper;
import org.aion.zero.impl.types.AionTxInfo;
import org.slf4j.Logger;

public class AionTransactionStoreSerializer {
private static final Logger LOG = AionLoggerFactory.getLogger(LogEnum.TX.toString());

public static final Serializer<List<AionTxInfo>> serializer =
public static final Serializer<Map<ByteArrayWrapper, AionTxInfo>> serializer =
new Serializer<>() {
@Override
public byte[] serialize(List<AionTxInfo> object) {
public byte[] serialize(Map<ByteArrayWrapper, AionTxInfo> object) {
byte[][] txsRlp = new byte[object.size()][];
for (int i = 0; i < txsRlp.length; i++) {
txsRlp[i] = object.get(i).getEncoded();
int i = 0;
for (AionTxInfo info : object.values()) {
txsRlp[i] = info.getEncoded();
i++;
}
return RLP.encodeList(txsRlp);
}

@Override
public List<AionTxInfo> deserialize(byte[] stream) {
public Map<ByteArrayWrapper, AionTxInfo> deserialize(byte[] stream) {
try {
RLPList params = RLP.decode2(stream);
RLPList infoList = (RLPList) params.get(0);
List<AionTxInfo> ret = new ArrayList<>();
Map<ByteArrayWrapper, AionTxInfo> ret = new HashMap<>();
for (int i = 0; i < infoList.size(); i++) {
ret.add(AionTxInfo.newInstanceFromEncoding(infoList.get(i).getRLPData()));
AionTxInfo info = AionTxInfo.newInstanceFromEncoding(infoList.get(i).getRLPData());
ret.put(info.blockHash, info);
}
return ret;
} catch (Exception e) {
Expand Down
11 changes: 6 additions & 5 deletions modAionImpl/src/org/aion/zero/impl/db/DBUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import org.aion.mcf.blockchain.Block;
import org.aion.zero.impl.config.CfgDb;
import org.aion.base.AccountState;
import org.aion.util.types.ByteArrayWrapper;
import org.aion.zero.impl.core.ImportResult;
import org.aion.mcf.db.Repository;
import org.aion.mcf.db.RepositoryCache;
Expand Down Expand Up @@ -702,16 +703,16 @@ public static Status queryTransaction(byte[] txHash) {
AionBlockchainImpl blockchain = new AionBlockchainImpl(cfg, false);

try {
List<AionTxInfo> txInfoList = blockchain.getTransactionStore().getTxInfo(txHash);
Map<ByteArrayWrapper, AionTxInfo> txInfoList = blockchain.getTransactionStore().getTxInfo(txHash);

if (txInfoList == null || txInfoList.isEmpty()) {
System.out.println("Can not find the transaction with given hash.");
return Status.FAILURE;
}

for (AionTxInfo info : txInfoList) {
for (Map.Entry<ByteArrayWrapper, AionTxInfo> entry : txInfoList.entrySet()) {

Block block = blockchain.getBlockStore().getBlockByHash(info.getBlockHash());
Block block = blockchain.getBlockStore().getBlockByHash(entry.getKey().toBytes());
if (block == null) {
System.out.println(
"Can not find the block data with given block hash of the transaction info.");
Expand All @@ -720,7 +721,7 @@ public static Status queryTransaction(byte[] txHash) {
return Status.FAILURE;
}

AionTransaction tx = block.getTransactionsList().get(info.getIndex());
AionTransaction tx = block.getTransactionsList().get(entry.getValue().getIndex());

if (tx == null) {
System.out.println("Can not find the transaction data with given hash.");
Expand All @@ -730,7 +731,7 @@ public static Status queryTransaction(byte[] txHash) {
}

System.out.println(tx.toString());
System.out.println(info.toString());
System.out.println(entry.getValue());
System.out.println();
}

Expand Down
52 changes: 17 additions & 35 deletions modAionImpl/src/org/aion/zero/impl/db/TransactionStore.java
Original file line number Diff line number Diff line change
@@ -1,13 +1,10 @@
package org.aion.zero.impl.db;

import static org.aion.util.others.Utils.dummy;

import java.io.Closeable;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
Expand All @@ -18,47 +15,35 @@
import org.aion.types.InternalTransaction;
import org.aion.util.types.ByteArrayWrapper;
import org.aion.zero.impl.types.AionTxInfo;
import org.apache.commons.collections4.map.LRUMap;

public class TransactionStore implements Closeable {
private final LRUMap<ByteArrayWrapper, Object> lastSavedTxHash = new LRUMap<>(5000);
private final ObjectStore<List<AionTxInfo>> txInfoSource;
private final ObjectStore<Map<ByteArrayWrapper, AionTxInfo>> txInfoSource;
private final ObjectStore<Set<ByteArrayWrapper>> aliasSource;

private final ReadWriteLock lock = new ReentrantReadWriteLock();

public TransactionStore(
ByteArrayKeyValueDatabase txInfoSrc, Serializer<List<AionTxInfo>> serializer) {
public TransactionStore(ByteArrayKeyValueDatabase txInfoSrc, Serializer<Map<ByteArrayWrapper, AionTxInfo>> serializer) {
// TODO AKI-436: introduce caching of recent transactions
txInfoSource = Stores.newObjectStore(txInfoSrc, serializer);
aliasSource = Stores.newObjectStore(txInfoSrc, aliasSerializer);
}

public boolean putTxInfoToBatch(AionTxInfo tx) {
public void putTxInfoToBatch(AionTxInfo tx) {

lock.writeLock().lock();

try {
byte[] txHash = tx.getReceipt().getTransaction().getTransactionHash();

List<AionTxInfo> existingInfos = null;
if (lastSavedTxHash.put(ByteArrayWrapper.wrap(txHash), dummy) != null
|| !lastSavedTxHash.isFull()) {
existingInfos = txInfoSource.get(txHash);
}

Map<ByteArrayWrapper, AionTxInfo> existingInfos = txInfoSource.get(txHash);
if (existingInfos == null) {
existingInfos = new ArrayList<>();
} else {
for (AionTxInfo info : existingInfos) {
if (Arrays.equals(info.getBlockHash(), tx.getBlockHash())) {
return false;
}
}
existingInfos = new HashMap<>();
}
existingInfos.add(tx);
txInfoSource.putToBatch(txHash, existingInfos);

return true;
// overwrites existing entry to update it with/without internal transactions
// depending on the chosen configuration at block import
existingInfos.put(tx.blockHash, tx);
txInfoSource.putToBatch(txHash, existingInfos);
} finally {
lock.writeLock().unlock();
}
Expand Down Expand Up @@ -94,22 +79,19 @@ public void flushBatch() {
}

public AionTxInfo getTxInfo(byte[] txHash, byte[] blockHash) {
// ensuring non-null input since this can be called with input from the API
if (txHash == null || blockHash == null) return null;

lock.readLock().lock();

try {
List<AionTxInfo> existingInfos = txInfoSource.get(txHash);
for (AionTxInfo info : existingInfos) {
if (Arrays.equals(info.getBlockHash(), blockHash)) {
return info;
}
}
return null;
return txInfoSource.get(txHash).get(ByteArrayWrapper.wrap(blockHash));
} finally {
lock.readLock().unlock();
}
}

public List<AionTxInfo> getTxInfo(byte[] key) {
public Map<ByteArrayWrapper, AionTxInfo> getTxInfo(byte[] key) {
lock.readLock().lock();
try {
return txInfoSource.get(key);
Expand Down
15 changes: 8 additions & 7 deletions modAionImpl/src/org/aion/zero/impl/types/AionTxInfo.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import org.aion.types.InternalTransaction.RejectedStatus;
import org.aion.util.conversions.Hex;
import org.aion.base.AionTxReceipt;
import org.aion.util.types.ByteArrayWrapper;
import org.slf4j.Logger;

public class AionTxInfo {
Expand All @@ -24,13 +25,13 @@ public class AionTxInfo {

// note that the receipt is modified to set the transaction
private final AionTxReceipt receipt;
private final byte[] blockHash;
public final ByteArrayWrapper blockHash;
private final int index;
private final List<InternalTransaction> internalTransactions;
private final boolean createdWithInternalTransactions;

/** @implNote Instance creation should be done through the static factory methods. */
private AionTxInfo(AionTxReceipt receipt, byte[] blockHash, int index, List<InternalTransaction> internalTransactions, boolean createdWithInternalTransactions) {
private AionTxInfo(AionTxReceipt receipt, ByteArrayWrapper blockHash, int index, List<InternalTransaction> internalTransactions, boolean createdWithInternalTransactions) {
this.receipt = receipt;
this.blockHash = blockHash;
this.index = index;
Expand All @@ -42,12 +43,12 @@ private AionTxInfo(AionTxReceipt receipt, byte[] blockHash, int index, List<Inte
* Creates an instance with the base data: receipt, block hash and index. Does not stored
* information regarding internal transactions.
*/
public static AionTxInfo newInstance(AionTxReceipt receipt, byte[] blockHash, int index) {
public static AionTxInfo newInstance(AionTxReceipt receipt, ByteArrayWrapper blockHash, int index) {
return new AionTxInfo(receipt, blockHash, index, null, false);
}

/** Creates an instance with receipt, block hash, index and internal transactions. */
public static AionTxInfo newInstanceWithInternalTransactions(AionTxReceipt receipt, byte[] blockHash, int index, List<InternalTransaction> internalTransactions) {
public static AionTxInfo newInstanceWithInternalTransactions(AionTxReceipt receipt, ByteArrayWrapper blockHash, int index, List<InternalTransaction> internalTransactions) {
return new AionTxInfo(receipt, blockHash, index, internalTransactions, true);
}

Expand Down Expand Up @@ -89,7 +90,7 @@ private static AionTxInfo decode(byte[] rlp) {
RLPList txInfo = (RLPList) params.get(0);

AionTxReceipt receipt = new AionTxReceipt(txInfo.get(INDEX_RECEIPT).getRLPData());
byte[] blockHash = txInfo.get(INDEX_BLOCK_HASH).getRLPData();
ByteArrayWrapper blockHash = ByteArrayWrapper.wrap(txInfo.get(INDEX_BLOCK_HASH).getRLPData());

int index;
RLPItem indexRLP = (RLPItem) txInfo.get(INDEX_TX_INDEX);
Expand Down Expand Up @@ -139,7 +140,7 @@ public void setTransaction(AionTransaction tx) {
public byte[] getEncoded() {

byte[] receiptRLP = this.receipt.toBytes();
byte[] blockHashRLP = RLP.encodeElement(blockHash);
byte[] blockHashRLP = RLP.encodeElement(blockHash.toBytes());
byte[] indexRLP = RLP.encodeInt(index);
byte[] completeRLP = RLP.encodeByte(createdWithInternalTransactions ? (byte) 1 : (byte) 0);

Expand All @@ -163,7 +164,7 @@ public AionTxReceipt getReceipt() {
}

public byte[] getBlockHash() {
return blockHash;
return blockHash.toBytes();
}

public int getIndex() {
Expand Down
5 changes: 2 additions & 3 deletions modApiServer/test/org/aion/api/server/types/TxTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ public void testTxInfoToJsonDetails(){
nrgLimit);
byte[] hash = block.getHash();

JSONObject object = Tx.aionTxInfoToDetailsJSON(AionTxInfo.newInstance(txReceipt,hash,0), block);
JSONObject object = Tx.aionTxInfoToDetailsJSON(AionTxInfo.newInstance(txReceipt,block.getHashWrapper(),0), block);
assertNotNull(object);
assertEquals(ByteUtil.byteArrayToLong(txNonce), object.get("nonce"));
assertEquals(StringUtils.toJsonHex(data), object.get("input"));
Expand Down Expand Up @@ -134,9 +134,8 @@ public void testTxInfoToJsonDetails(){
solution,
nrgConsumed,
nrgLimit);
byte[] hashWithLog = block.getHash();

JSONObject jsonWithLogs = Tx.aionTxInfoToDetailsJSON(AionTxInfo.newInstance(txReceiptWithLog, hashWithLog, 0), blockWithLog);
JSONObject jsonWithLogs = Tx.aionTxInfoToDetailsJSON(AionTxInfo.newInstance(txReceiptWithLog, blockWithLog.getHashWrapper(), 0), blockWithLog);

assertNotNull(jsonWithLogs);
List<Object> jsonArray = jsonWithLogs.getJSONArray("logs").toList();
Expand Down