Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add transient tx map to DaoState to speed up getTx queries #3773

Merged
merged 3 commits into from
Dec 16, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions core/src/main/java/bisq/core/dao/DaoFacade.java
Original file line number Diff line number Diff line change
Expand Up @@ -624,8 +624,8 @@ public Set<TxOutput> getUnspentTxOutputs() {
return daoStateService.getUnspentTxOutputs();
}

public Set<Tx> getTxs() {
return daoStateService.getTxs();
public int getNumTxs() {
return daoStateService.getNumTxs();
}

public Optional<TxOutput> getLockupTxOutput(String txId) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ public void maybeExportToJson() {
// Access to daoStateService is single threaded, we must not access daoStateService from the thread.
List<JsonTxOutput> allJsonTxOutputs = new ArrayList<>();

List<JsonTx> jsonTxs = daoStateService.getTxStream()
List<JsonTx> jsonTxs = daoStateService.getUnorderedTxStream()
.map(tx -> {
JsonTx jsonTx = getJsonTx(tx);
allJsonTxOutputs.addAll(jsonTx.getOutputs());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import bisq.core.dao.node.parser.exceptions.BlockHeightNotConnectingException;
import bisq.core.dao.state.DaoStateService;
import bisq.core.dao.state.model.blockchain.Block;
import bisq.core.dao.state.model.blockchain.Tx;

import bisq.common.app.DevEnv;

Expand All @@ -31,7 +30,6 @@
import javax.inject.Inject;

import java.util.LinkedList;
import java.util.List;

import lombok.extern.slf4j.Slf4j;

Expand All @@ -55,7 +53,6 @@ public class BlockParser {
// Constructor
///////////////////////////////////////////////////////////////////////////////////////////

@SuppressWarnings("WeakerAccess")
@Inject
public BlockParser(TxParser txParser,
DaoStateService daoStateService) {
Expand Down Expand Up @@ -106,14 +103,13 @@ public Block parseBlock(RawBlock rawBlock) throws BlockHashNotConnectingExceptio
// one get resolved.
// Lately there is a patter with 24 iterations observed
long startTs = System.currentTimeMillis();
List<Tx> txList = block.getTxs();

rawBlock.getRawTxs().forEach(rawTx ->
txParser.findTx(rawTx,
genesisTxId,
genesisBlockHeight,
genesisTotalSupply)
.ifPresent(txList::add));
.ifPresent(tx -> daoStateService.onNewTxForLastBlock(block, tx)));

log.info("Parsing {} transactions at block height {} took {} ms", rawBlock.getRawTxs().size(),
blockHeight, System.currentTimeMillis() - startTs);
Expand Down
69 changes: 42 additions & 27 deletions core/src/main/java/bisq/core/dao/state/DaoStateService.java
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,8 @@
import bisq.core.dao.state.model.governance.Issuance;
import bisq.core.dao.state.model.governance.IssuanceType;
import bisq.core.dao.state.model.governance.ParamChange;
import bisq.core.util.coin.BsqFormatter;
import bisq.core.util.ParsingUtils;
import bisq.core.util.coin.BsqFormatter;

import org.bitcoinj.core.Coin;

Expand Down Expand Up @@ -115,6 +115,8 @@ public void applySnapshot(DaoState snapshot) {

daoState.setChainHeight(snapshot.getChainHeight());

daoState.setTxCache(snapshot.getTxCache());

daoState.getBlocks().clear();
daoState.getBlocks().addAll(snapshot.getBlocks());

Expand Down Expand Up @@ -226,7 +228,25 @@ public void onNewBlockWithEmptyTxs(Block block) {
}
}

// Third we get the onParseBlockComplete called after all rawTxs of blocks have been parsed
// Third we add each successfully parsed BSQ tx to the last block
public void onNewTxForLastBlock(Block block, Tx tx) {
assertDaoStateChange();

getLastBlock().ifPresent(lastBlock -> {
if (block == lastBlock) {
// We need to ensure that the txs in all blocks are in sync with the txs in our txMap (cache).
block.addTx(tx);
daoState.addToTxCache(tx);
} else {
// Not clear if this case can happen but at onNewBlockWithEmptyTxs we handle such a potential edge
// case as well, so we need to reflect that here as well.
log.warn("Block for parsing does not match last block. That might happen in edge cases at reorgs. " +
"Received block={}", block);
}
});
}

// Fourth we get the onParseBlockComplete called after all rawTxs of blocks have been parsed
public void onParseBlockComplete(Block block) {
if (parseBlockChainComplete)
log.info("Parse block completed: Block height {}, {} BSQ transactions.", block.getHeight(), block.getTxs().size());
Expand Down Expand Up @@ -343,29 +363,24 @@ public Optional<Tx> getGenesisTx() {
// Tx
///////////////////////////////////////////////////////////////////////////////////////////

public Stream<Tx> getTxStream() {
return getBlocks().stream()
.flatMap(block -> block.getTxs().stream());
public Stream<Tx> getUnorderedTxStream() {
return daoState.getTxCache().values().stream();
}

public TreeMap<String, Tx> getTxMap() {
return new TreeMap<>(getTxStream().collect(Collectors.toMap(Tx::getId, tx -> tx)));
}

public Set<Tx> getTxs() {
return getTxStream().collect(Collectors.toSet());
}

public Optional<Tx> getTx(String txId) {
return getTxStream().filter(tx -> tx.getId().equals(txId)).findAny();
public int getNumTxs() {
return daoState.getTxCache().size();
}

public List<Tx> getInvalidTxs() {
return getTxStream().filter(tx -> tx.getTxType() == TxType.INVALID).collect(Collectors.toList());
return getUnorderedTxStream().filter(tx -> tx.getTxType() == TxType.INVALID).collect(Collectors.toList());
}

public List<Tx> getIrregularTxs() {
return getTxStream().filter(tx -> tx.getTxType() == TxType.IRREGULAR).collect(Collectors.toList());
return getUnorderedTxStream().filter(tx -> tx.getTxType() == TxType.IRREGULAR).collect(Collectors.toList());
}

public Optional<Tx> getTx(String txId) {
return Optional.ofNullable(daoState.getTxCache().get(txId));
}

public boolean containsTx(String txId) {
Expand Down Expand Up @@ -395,11 +410,11 @@ public boolean hasTxBurntFee(String txId) {
}

public long getTotalBurntFee() {
return getTxStream().mapToLong(Tx::getBurntFee).sum();
return getUnorderedTxStream().mapToLong(Tx::getBurntFee).sum();
}

public Set<Tx> getBurntFeeTxs() {
return getTxStream()
return getUnorderedTxStream()
.filter(tx -> tx.getBurntFee() > 0)
.collect(Collectors.toSet());
}
Expand All @@ -418,17 +433,17 @@ public Optional<TxOutput> getConnectedTxOutput(TxInput txInput) {
// TxOutput
///////////////////////////////////////////////////////////////////////////////////////////

public Stream<TxOutput> getTxOutputStream() {
return getTxStream()
private Stream<TxOutput> getUnorderedTxOutputStream() {
return getUnorderedTxStream()
.flatMap(tx -> tx.getTxOutputs().stream());
}

public boolean existsTxOutput(TxOutputKey key) {
return getTxOutputStream().anyMatch(txOutput -> txOutput.getKey().equals(key));
return getUnorderedTxOutputStream().anyMatch(txOutput -> txOutput.getKey().equals(key));
}

public Optional<TxOutput> getTxOutput(TxOutputKey txOutputKey) {
return getTxOutputStream()
return getUnorderedTxOutputStream()
.filter(txOutput -> txOutput.getKey().equals(txOutputKey))
.findAny();
}
Expand Down Expand Up @@ -513,8 +528,8 @@ public boolean isTxOutputSpendable(TxOutputKey key) {
// TxOutputType
///////////////////////////////////////////////////////////////////////////////////////////

public Set<TxOutput> getTxOutputsByTxOutputType(TxOutputType txOutputType) {
return getTxOutputStream()
private Set<TxOutput> getTxOutputsByTxOutputType(TxOutputType txOutputType) {
return getUnorderedTxOutputStream()
.filter(txOutput -> txOutput.getTxOutputType() == txOutputType)
.collect(Collectors.toSet());
}
Expand Down Expand Up @@ -823,12 +838,12 @@ public long getTotalAmountOfConfiscatedTxOutputs() {
}

public long getTotalAmountOfInvalidatedBsq() {
return getTxStream().mapToLong(Tx::getInvalidatedBsq).sum();
return getUnorderedTxStream().mapToLong(Tx::getInvalidatedBsq).sum();
}

// Contains burnt fee and invalidated bsq due invalid txs
public long getTotalAmountOfBurntBsq() {
return getTxStream().mapToLong(Tx::getBurntBsq).sum();
return getUnorderedTxStream().mapToLong(Tx::getBurntBsq).sum();
}

// Confiscate bond
Expand Down
30 changes: 29 additions & 1 deletion core/src/main/java/bisq/core/dao/state/model/DaoState.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import bisq.core.dao.state.model.blockchain.Block;
import bisq.core.dao.state.model.blockchain.SpentInfo;
import bisq.core.dao.state.model.blockchain.Tx;
import bisq.core.dao.state.model.blockchain.TxOutput;
import bisq.core.dao.state.model.blockchain.TxOutputKey;
import bisq.core.dao.state.model.governance.Cycle;
Expand All @@ -28,22 +29,25 @@
import bisq.core.dao.state.model.governance.ParamChange;

import bisq.common.proto.persistable.PersistablePayload;
import bisq.common.util.JsonExclude;

import com.google.protobuf.Message;

import javax.inject.Inject;

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
import java.util.function.Function;
import java.util.stream.Collectors;

import lombok.Getter;
import lombok.extern.slf4j.Slf4j;


/**
* Root class for mutable state of the DAO.
* Holds both blockchain data as well as data derived from the governance process (voting).
Expand Down Expand Up @@ -98,6 +102,10 @@ public static DaoState getClone(DaoState daoState) {
@Getter
private final List<DecryptedBallotsWithMerits> decryptedBallotsWithMeritsList;

// Transient data used only as an index - must be kept in sync with the block list
@JsonExclude
private transient final Map<String, Tx> txCache; // key is txId


///////////////////////////////////////////////////////////////////////////////////////////
// Constructor
Expand Down Expand Up @@ -145,6 +153,10 @@ private DaoState(int chainHeight,
this.paramChangeList = paramChangeList;
this.evaluatedProposalList = evaluatedProposalList;
this.decryptedBallotsWithMeritsList = decryptedBallotsWithMeritsList;

txCache = blocks.stream()
.flatMap(block -> block.getTxs().stream())
.collect(Collectors.toMap(Tx::getId, Function.identity(), (x, y) -> x, HashMap::new));
}

@Override
Expand Down Expand Up @@ -224,6 +236,21 @@ public byte[] getSerializedStateForHashChain() {
return getBsqStateBuilderExcludingBlocks().addBlocks(getBlocks().getLast().toProtoMessage()).build().toByteArray();
}

public void addToTxCache(Tx tx) {
// We shouldn't get duplicate txIds, but use putIfAbsent instead of put for consistency with the map merge
// function used in the constructor to initialise txCache (and to exactly match the pre-caching behaviour).
txCache.putIfAbsent(tx.getId(), tx);
}

public void setTxCache(Map<String, Tx> txCache) {
this.txCache.clear();
this.txCache.putAll(txCache);
}

public Map<String, Tx> getTxCache() {
return Collections.unmodifiableMap(txCache);
}

@Override
public String toString() {
return "DaoState{" +
Expand All @@ -237,6 +264,7 @@ public String toString() {
",\n paramChangeList=" + paramChangeList +
",\n evaluatedProposalList=" + evaluatedProposalList +
",\n decryptedBallotsWithMeritsList=" + decryptedBallotsWithMeritsList +
",\n txCache=" + txCache +
"\n}";
}
}
15 changes: 13 additions & 2 deletions core/src/main/java/bisq/core/dao/state/model/blockchain/Block.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,11 @@
import com.google.common.collect.ImmutableList;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.stream.Collectors;

import lombok.EqualsAndHashCode;
import lombok.Value;

/**
* The Block which gets persisted in the DaoState. During parsing transactions can be
Expand All @@ -44,8 +44,8 @@
*
*/
@EqualsAndHashCode(callSuper = true)
@Value
public final class Block extends BaseBlock implements PersistablePayload, ImmutableDaoStateModel {
// We do not expose txs with a Lombok getter. We cannot make it immutable as we add transactions during parsing.
private final List<Tx> txs;

public Block(int height, long time, String hash, String previousBlockHash) {
Expand Down Expand Up @@ -93,6 +93,17 @@ public static Block fromProto(protobuf.BaseBlock proto) {
txs);
}

public void addTx(Tx tx) {
txs.add(tx);
}

// We want to guarantee that no client can modify the list. We use unmodifiableList and not ImmutableList as
// we want that clients reflect any change to the source list. Also ImmutableList is more expensive as it
// creates a copy.
public List<Tx> getTxs() {
return Collections.unmodifiableList(txs);
}

@Override
public String toString() {
return "Block{" +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ public void onParseBlockCompleteAfterBatchProcessing(Block block) {
///////////////////////////////////////////////////////////////////////////////////////////

private void updateWithBsqBlockChainData() {
allTxTextField.setText(String.valueOf(daoFacade.getTxs().size()));
allTxTextField.setText(String.valueOf(daoFacade.getNumTxs()));
utxoTextField.setText(String.valueOf(daoFacade.getUnspentTxOutputs().size()));
compensationIssuanceTxTextField.setText(String.valueOf(daoFacade.getNumIssuanceTransactions(IssuanceType.COMPENSATION)));
reimbursementIssuanceTxTextField.setText(String.valueOf(daoFacade.getNumIssuanceTransactions(IssuanceType.REIMBURSEMENT)));
Expand Down