Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add transient tx map to DaoState to speed up getTx queries #3773

Merged
merged 3 commits into from
Dec 16, 2019
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion core/src/main/java/bisq/core/dao/DaoFacade.java
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@
import java.io.IOException;

import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Optional;
import java.util.Set;
Expand Down Expand Up @@ -624,7 +625,8 @@ public Set<TxOutput> getUnspentTxOutputs() {
return daoStateService.getUnspentTxOutputs();
}

public Set<Tx> getTxs() {
// Returns a view rather than a copy of all the txs.
public Collection<Tx> getTxs() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As the client only is interested in the number of txs, I think we should change that to a method only returning the size. This will further render the getTxs() method in daoStateService needless.

return daoStateService.getTxs();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import bisq.core.dao.node.parser.exceptions.BlockHeightNotConnectingException;
import bisq.core.dao.state.DaoStateService;
import bisq.core.dao.state.model.blockchain.Block;
import bisq.core.dao.state.model.blockchain.Tx;

import bisq.common.app.DevEnv;

Expand All @@ -31,7 +30,6 @@
import javax.inject.Inject;

import java.util.LinkedList;
import java.util.List;

import lombok.extern.slf4j.Slf4j;

Expand Down Expand Up @@ -106,14 +104,13 @@ public Block parseBlock(RawBlock rawBlock) throws BlockHashNotConnectingExceptio
// one get resolved.
// Lately there is a patter with 24 iterations observed
long startTs = System.currentTimeMillis();
List<Tx> txList = block.getTxs();

rawBlock.getRawTxs().forEach(rawTx ->
txParser.findTx(rawTx,
genesisTxId,
genesisBlockHeight,
genesisTotalSupply)
.ifPresent(txList::add));
.ifPresent(tx -> daoStateService.onNewTxForLastBlock(block, tx)));

log.info("Parsing {} transactions at block height {} took {} ms", rawBlock.getRawTxs().size(),
blockHeight, System.currentTimeMillis() - startTs);
Expand Down
60 changes: 38 additions & 22 deletions core/src/main/java/bisq/core/dao/state/DaoStateService.java
Original file line number Diff line number Diff line change
Expand Up @@ -35,14 +35,18 @@
import bisq.core.dao.state.model.governance.Issuance;
import bisq.core.dao.state.model.governance.IssuanceType;
import bisq.core.dao.state.model.governance.ParamChange;
import bisq.core.util.coin.BsqFormatter;
import bisq.core.util.ParsingUtils;
import bisq.core.util.coin.BsqFormatter;

import org.bitcoinj.core.Coin;

import javax.inject.Inject;

import com.google.common.base.Preconditions;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashSet;
import java.util.LinkedList;
Expand Down Expand Up @@ -115,6 +119,9 @@ public void applySnapshot(DaoState snapshot) {

daoState.setChainHeight(snapshot.getChainHeight());

daoState.getTxMap().clear();
daoState.getTxMap().putAll(snapshot.getTxMap());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How did caching at the DaoState level compare to caching at the Block level? Keeping object in-sync is complicated and I'd be interested in understanding if the simpler block-level cache has most of the gain without any of the synchronization complication.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not quite sure what you mean by caching at the block level - do you mean adding a transient field of some kind to Block?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. Instead of keeping a map of all txns that needs to be kept in sync in the DaoState object, the Block could just cache the list of transactions for itself and the lookup functions changed from O(txns) to O(blocks). I only bring this up because it would require less complexity and since there are no tests it may make it easier to guarantee correctness through just code review.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I fear that might not have much effect on performance as usually there are very few txs in a block. In average we have 1 tx in 2 blocks.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The snapshot is created from the persisted state, but the transient map isn't saved to disk. Is the tx map always empty after applying a snapshot?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, it shouldn't be because the snapshot is just another DaoState instance constructed via DaoState.fromProto(), and in that method it recalculates and populates the transient field.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Indeed. The DaoStateStore calls DaoState.fromProto so the cache would be created.


daoState.getBlocks().clear();
daoState.getBlocks().addAll(snapshot.getBlocks());

Expand Down Expand Up @@ -226,7 +233,16 @@ public void onNewBlockWithEmptyTxs(Block block) {
}
}

// Third we get the onParseBlockComplete called after all rawTxs of blocks have been parsed
// Third we add each successfully parsed BSQ tx to the last block
public void onNewTxForLastBlock(Block block, Tx tx) {
// At least one block must be present else no rawTx would have been recognised as a BSQ tx.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should add here the assertDaoStateChange(); check for conistency. All methods which alter the DAO state call that method to guard against changes outside of the permitted process. Only at parsing the DAO state must be changed.

Preconditions.checkArgument(block == getLastBlock().orElseThrow());
Copy link
Contributor

@chimp1984 chimp1984 Dec 11, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This would change the original behaviour.

We used the onNewBlockWithEmptyTxs to add a block, but in case we got into the if path we did not add the block but only log a warning. Adding of transactions would had no effect as it was a local list in the original code. Now we throw an exception in such a case as we expect that the block has been added.

EDIT:
To be more clear. We should use a if case and ignore if the block is not the last block. This would reflect the existing behaviour.

public void onNewBlockWithEmptyTxs(Block block) {
        assertDaoStateChange();
        if (daoState.getBlocks().isEmpty() && block.getHeight() != getGenesisBlockHeight()) {
            log.warn("We don't have any blocks yet and we received a block which is not the genesis block. " +
                    "We ignore that block as the first block need to be the genesis block. " +
                    "That might happen in edge cases at reorgs. Received block={}", block);
        } else {
            daoState.getBlocks().add(block);

            if (parseBlockChainComplete)
                log.info("New Block added at blockHeight {}", block.getHeight());
        }
    }

I am not sure if that case is valid and can happen, but as the log suggests there might be tricky edge cases in re-org scenarious where this was a possible scenario. Testing those edge cases is pretty tricky and it can be that it was during development an issue which disappeared later and is not present anymore. But I would prefer to stay very conservative/restrictive in the DAO domain as a consensus bug can have severe consequences and the DAO has a very deep level of complexity. If we are not 100% sure that existing code is wrong I prefer to stick with it, as this code base has been tested excessively and is in production since April.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

NACK for that Preconditions check, otherwise it looks good. Thanks for working on that.
I need a bit more time to go again over all and check if anything might be missing.

Could you provide a comparision of performance gains from that PR?


block.getTxs().add(tx);
daoState.getTxMap().put(tx.getId(), tx);
}

// Fourth we get the onParseBlockComplete called after all rawTxs of blocks have been parsed
public void onParseBlockComplete(Block block) {
if (parseBlockChainComplete)
log.info("Parse block completed: Block height {}, {} BSQ transactions.", block.getHeight(), block.getTxs().size());
Expand Down Expand Up @@ -348,24 +364,24 @@ public Stream<Tx> getTxStream() {
.flatMap(block -> block.getTxs().stream());
}

public TreeMap<String, Tx> getTxMap() {
return new TreeMap<>(getTxStream().collect(Collectors.toMap(Tx::getId, tx -> tx)));
private Stream<Tx> getUnorderedTxStream() {
return getTxs().stream();
}

public Set<Tx> getTxs() {
return getTxStream().collect(Collectors.toSet());
}

public Optional<Tx> getTx(String txId) {
return getTxStream().filter(tx -> tx.getId().equals(txId)).findAny();
public Collection<Tx> getTxs() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This method is only called by DaoFacade where the client is only interested in the size, so I would suggest to add a new getNumTxs() method instead and the other usage is the getUnorderedTxStream() method in the same class, which could use daoState.getTxMap().values().stream() directly. So we can remove the getTxs() completely.

return Collections.unmodifiableCollection(daoState.getTxMap().values());
}

public List<Tx> getInvalidTxs() {
return getTxStream().filter(tx -> tx.getTxType() == TxType.INVALID).collect(Collectors.toList());
return getUnorderedTxStream().filter(tx -> tx.getTxType() == TxType.INVALID).collect(Collectors.toList());
}

public List<Tx> getIrregularTxs() {
return getTxStream().filter(tx -> tx.getTxType() == TxType.IRREGULAR).collect(Collectors.toList());
return getUnorderedTxStream().filter(tx -> tx.getTxType() == TxType.IRREGULAR).collect(Collectors.toList());
}

public Optional<Tx> getTx(String txId) {
return Optional.ofNullable(daoState.getTxMap().get(txId));
}

public boolean containsTx(String txId) {
Expand Down Expand Up @@ -395,11 +411,11 @@ public boolean hasTxBurntFee(String txId) {
}

public long getTotalBurntFee() {
return getTxStream().mapToLong(Tx::getBurntFee).sum();
return getUnorderedTxStream().mapToLong(Tx::getBurntFee).sum();
}

public Set<Tx> getBurntFeeTxs() {
return getTxStream()
return getUnorderedTxStream()
.filter(tx -> tx.getBurntFee() > 0)
.collect(Collectors.toSet());
}
Expand All @@ -418,17 +434,17 @@ public Optional<TxOutput> getConnectedTxOutput(TxInput txInput) {
// TxOutput
///////////////////////////////////////////////////////////////////////////////////////////

public Stream<TxOutput> getTxOutputStream() {
return getTxStream()
private Stream<TxOutput> getUnorderedTxOutputStream() {
return getUnorderedTxStream()
.flatMap(tx -> tx.getTxOutputs().stream());
}

public boolean existsTxOutput(TxOutputKey key) {
return getTxOutputStream().anyMatch(txOutput -> txOutput.getKey().equals(key));
return getUnorderedTxOutputStream().anyMatch(txOutput -> txOutput.getKey().equals(key));
}

public Optional<TxOutput> getTxOutput(TxOutputKey txOutputKey) {
return getTxOutputStream()
return getUnorderedTxOutputStream()
.filter(txOutput -> txOutput.getKey().equals(txOutputKey))
.findAny();
}
Expand Down Expand Up @@ -513,8 +529,8 @@ public boolean isTxOutputSpendable(TxOutputKey key) {
// TxOutputType
///////////////////////////////////////////////////////////////////////////////////////////

public Set<TxOutput> getTxOutputsByTxOutputType(TxOutputType txOutputType) {
return getTxOutputStream()
private Set<TxOutput> getTxOutputsByTxOutputType(TxOutputType txOutputType) {
return getUnorderedTxOutputStream()
.filter(txOutput -> txOutput.getTxOutputType() == txOutputType)
.collect(Collectors.toSet());
}
Expand Down Expand Up @@ -823,12 +839,12 @@ public long getTotalAmountOfConfiscatedTxOutputs() {
}

public long getTotalAmountOfInvalidatedBsq() {
return getTxStream().mapToLong(Tx::getInvalidatedBsq).sum();
return getUnorderedTxStream().mapToLong(Tx::getInvalidatedBsq).sum();
}

// Contains burnt fee and invalidated bsq due invalid txs
public long getTotalAmountOfBurntBsq() {
return getTxStream().mapToLong(Tx::getBurntBsq).sum();
return getUnorderedTxStream().mapToLong(Tx::getBurntBsq).sum();
}

// Confiscate bond
Expand Down
14 changes: 14 additions & 0 deletions core/src/main/java/bisq/core/dao/state/model/DaoState.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import bisq.core.dao.state.model.blockchain.Block;
import bisq.core.dao.state.model.blockchain.SpentInfo;
import bisq.core.dao.state.model.blockchain.Tx;
import bisq.core.dao.state.model.blockchain.TxOutput;
import bisq.core.dao.state.model.blockchain.TxOutputKey;
import bisq.core.dao.state.model.governance.Cycle;
Expand All @@ -28,16 +29,19 @@
import bisq.core.dao.state.model.governance.ParamChange;

import bisq.common.proto.persistable.PersistablePayload;
import bisq.common.util.JsonExclude;

import com.google.protobuf.Message;

import javax.inject.Inject;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
import java.util.function.Function;
import java.util.stream.Collectors;

import lombok.Getter;
Expand Down Expand Up @@ -98,6 +102,11 @@ public static DaoState getClone(DaoState daoState) {
@Getter
private final List<DecryptedBallotsWithMerits> decryptedBallotsWithMeritsList;

// Transient data used only as an index - must be kept in sync with the block list
@Getter
@JsonExclude
private transient final Map<String, Tx> txMap; // key is txId
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How does the memory footprint change with this cache and how is it expected to scale over time?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's using a hash table rather than a tree set, so I don't think the additional memory will be a problem - there are only about 10,000 txs or so right now, so I don't think it will take up more than 100KB or so. (The fact that DaoState.blocks is a linked list instead of an array list is probably a more significant memory issue that could be easily fixed.)



///////////////////////////////////////////////////////////////////////////////////////////
// Constructor
Expand Down Expand Up @@ -145,6 +154,10 @@ private DaoState(int chainHeight,
this.paramChangeList = paramChangeList;
this.evaluatedProposalList = evaluatedProposalList;
this.decryptedBallotsWithMeritsList = decryptedBallotsWithMeritsList;

txMap = blocks.stream()
.flatMap(block -> block.getTxs().stream())
.collect(Collectors.toMap(Tx::getId, Function.identity(), (x, y) -> y, HashMap::new));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I assume the mergeFunction is only passed as you want to have the mapFactory. Not sure what the mergeFunction really should do as conflicts are not expected and not clear how to handle it. Maybe throwing an exception would be more appropriate here? Or maybe just add a comment.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wanted to guarantee that the output is a HashMap and it looks like the only toMap overload with a mapFactory param also has a mergeFunction param, so I was forced to provide a dummy merge function.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I expected that intention...

I just was wondering how we can be sure to not change behaviour. The previous code used the flatMap.

 public Stream<Tx> getTxStream() {
        return getBlocks().stream()
                .flatMap(block -> block.getTxs().stream());
    }

Do you know how potential key conflics would have been handled there? i assume your mergeFunction to overwrite with a new value if it happens is likely the standad behaviour if not otherwise defined. So your mergeFunction is likely better than throwing an exception if flatMap behaves the same. Anyway a bit "esoteric" but the DAO might deserve a bit of extra paranoia ;-)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

With the (x, y) -> y merge function it's currently using, it will always select the last tx with a given txId from the tx stream when building the map, whereas in the original code getTx is calling findAny on the filtered getTxStream output, which will probably behave the same as findFirst in this case. So perhaps the merge function should be (x, y) -> x to be absolutely sure the behaviour doesn't change.

Also, for consistent merge behaviour, putIfAbsent would need to be substituted into the line:

daoState.getTxMap().put(tx.getId(), tx);

in DaoState.onNewTxForLastBlock.

}

@Override
Expand Down Expand Up @@ -237,6 +250,7 @@ public String toString() {
",\n paramChangeList=" + paramChangeList +
",\n evaluatedProposalList=" + evaluatedProposalList +
",\n decryptedBallotsWithMeritsList=" + decryptedBallotsWithMeritsList +
",\n txMap=" + txMap +
"\n}";
}
}