Skip to content

Commit

Permalink
Merge pull request #892 from MonsieurNicolas/pendingTransactions
Browse files Browse the repository at this point in the history
Pending transactions

Reviewed-by: jedmccaleb
  • Loading branch information
latobarita committed Oct 30, 2015
2 parents 7da3a14 + 88fdffb commit 73147b7
Show file tree
Hide file tree
Showing 6 changed files with 117 additions and 95 deletions.
6 changes: 3 additions & 3 deletions docs/stellar-core_example.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -145,9 +145,9 @@ KNOWN_PEERS=[
#
# stellar-core --convertid <seed>
#
# automatically adds a common name `self` to NODE_NAMES list if you specify
# a NODE_SEED
NODE_SEED="SBI3CZU7XZEWVXU7OZLW5MMUQAP334JFOPXSLTPOH43IRTEQ2QYXU5RG"
# This example also adds a common name to NODE_NAMES list named `self` with the
# public key associated to this seed
NODE_SEED="SBI3CZU7XZEWVXU7OZLW5MMUQAP334JFOPXSLTPOH43IRTEQ2QYXU5RG self"

# NODE_IS_VALIDATOR (boolean) default false.
# Only nodes that want to participate in SCP should set NODE_IS_VALIDATOR=true.
Expand Down
2 changes: 1 addition & 1 deletion docs/stellar-core_standalone.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ RUN_STANDALONE=true

NETWORK_PASSPHRASE="Test SDF Network ; September 2015"

NODE_SEED="SDQVDISRYN2JXBS7ICL7QJAEKB3HWBJFP2QECXG7GZICAHBK4UNJCWK2"
NODE_SEED="SDQVDISRYN2JXBS7ICL7QJAEKB3HWBJFP2QECXG7GZICAHBK4UNJCWK2 self"
NODE_IS_VALIDATOR=true

#DATABASE="postgresql://dbname=stellar user=postgres password=password host=localhost"
Expand Down
106 changes: 48 additions & 58 deletions src/herder/HerderImpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ HerderImpl::SCPMetrics::SCPMetrics(Application& app)
HerderImpl::HerderImpl(Application& app)
: mSCP(*this, app.getConfig().NODE_SEED, app.getConfig().NODE_IS_VALIDATOR,
app.getConfig().QUORUM_SET)
, mReceivedTransactions(4)
, mPendingTransactions(4)
, mPendingEnvelopes(app, *this)
, mLastSlotSaved(0)
, mLastStateChange(app.getClock().now())
Expand Down Expand Up @@ -617,65 +617,14 @@ HerderImpl::valueExternalized(uint64 slotIndex, Value const& value)
mLedgerManager.externalizeValue(ledgerData);

// perform cleanups

// remove all these tx from mReceivedTransactions
removeReceivedTxs(externalizedSet->mTransactions);

// rebroadcast those left in set 1, sorted in an apply-order.
{
Hash h;
TxSetFrame broadcast(h);
assert(mReceivedTransactions.size() >= 2);
for (auto const& pair : mReceivedTransactions[1])
{
for (auto const& tx : pair.second->mTransactions)
{
broadcast.add(tx.second);
}
}
for (auto tx : broadcast.sortForApply())
{
auto msg = tx->toStellarMessage();
mApp.getOverlayManager().broadcastMessage(msg);
}
}
updatePendingTransactions(externalizedSet->mTransactions);

// Evict slots that are outside of our ledger validity bracket
if (slotIndex > MAX_SLOTS_TO_REMEMBER)
{
mSCP.purgeSlots(slotIndex - MAX_SLOTS_TO_REMEMBER);
}

assert(mReceivedTransactions.size() >= 4);

// Move all the remaining to the next highest level don't move the
// largest array.
for (size_t n = mReceivedTransactions.size() - 1; n > 0; n--)
{
auto& curr = mReceivedTransactions[n];
auto& prev = mReceivedTransactions[n - 1];
for (auto const& pair : prev)
{
auto const& acc = pair.first;
auto const& srcmap = pair.second;
auto dstmap = findOrAdd(curr, acc);
for (auto tx : srcmap->mTransactions)
{
dstmap->addTx(tx.second);
}
}
prev.clear();
}

mSCPMetrics.mHerderPendingTxs0.set_count(
countTxs(mReceivedTransactions[0]));
mSCPMetrics.mHerderPendingTxs1.set_count(
countTxs(mReceivedTransactions[1]));
mSCPMetrics.mHerderPendingTxs2.set_count(
countTxs(mReceivedTransactions[2]));
mSCPMetrics.mHerderPendingTxs3.set_count(
countTxs(mReceivedTransactions[3]));

ledgerClosed();
}

Expand Down Expand Up @@ -964,7 +913,7 @@ HerderImpl::recvTransaction(TransactionFramePtr tx)
int64_t totFee = tx->getFee();
SequenceNumber highSeq = 0;

for (auto& map : mReceivedTransactions)
for (auto& map : mPendingTransactions)
{
auto i = map.find(acc);
if (i != map.end())
Expand Down Expand Up @@ -994,7 +943,7 @@ HerderImpl::recvTransaction(TransactionFramePtr tx)
CLOG(TRACE, "Herder") << "recv transaction " << hexAbbrev(txID) << " for "
<< PubKeyUtils::toShortString(acc);

auto txmap = findOrAdd(mReceivedTransactions[0], acc);
auto txmap = findOrAdd(mPendingTransactions[0], acc);
txmap->addTx(tx);

return TX_STATUS_PENDING;
Expand Down Expand Up @@ -1224,7 +1173,7 @@ HerderImpl::ledgerClosed()
void
HerderImpl::removeReceivedTxs(std::vector<TransactionFramePtr> const& dropTxs)
{
for (auto& m : mReceivedTransactions)
for (auto& m : mPendingTransactions)
{
if (m.empty())
{
Expand Down Expand Up @@ -1316,7 +1265,7 @@ SequenceNumber
HerderImpl::getMaxSeqInPendingTxs(AccountID const& acc)
{
SequenceNumber highSeq = 0;
for (auto const& m : mReceivedTransactions)
for (auto const& m : mPendingTransactions)
{
auto i = m.find(acc);
if (i == m.end())
Expand Down Expand Up @@ -1346,7 +1295,7 @@ HerderImpl::triggerNextLedger(uint32_t ledgerSeqToTrigger)
auto const& lcl = mLedgerManager.getLastClosedLedgerHeader();
TxSetFramePtr proposedSet = std::make_shared<TxSetFrame>(lcl.hash);

for (auto const& m : mReceivedTransactions)
for (auto const& m : mPendingTransactions)
{
for (auto const& pair : m)
{
Expand Down Expand Up @@ -1627,6 +1576,47 @@ HerderImpl::trackingHeartBeat()
&VirtualTimer::onFailureNoop);
}

void
HerderImpl::updatePendingTransactions(
std::vector<TransactionFramePtr> const& applied)
{
// remove all these tx from mPendingTransactions
removeReceivedTxs(applied);

// drop the highest level
mPendingTransactions.erase(--mPendingTransactions.end());

// shift entries up
mPendingTransactions.emplace_front();

// rebroadcast entries, sorted in apply-order to maximize chances of
// propagation
{
Hash h;
TxSetFrame toBroadcast(h);
for (auto const& l : mPendingTransactions)
{
for (auto const& pair : l)
{
for (auto const& tx : pair.second->mTransactions)
{
toBroadcast.add(tx.second);
}
}
}
for (auto tx : toBroadcast.sortForApply())
{
auto msg = tx->toStellarMessage();
mApp.getOverlayManager().broadcastMessage(msg);
}
}

mSCPMetrics.mHerderPendingTxs0.set_count(countTxs(mPendingTransactions[0]));
mSCPMetrics.mHerderPendingTxs1.set_count(countTxs(mPendingTransactions[1]));
mSCPMetrics.mHerderPendingTxs2.set_count(countTxs(mPendingTransactions[2]));
mSCPMetrics.mHerderPendingTxs3.set_count(countTxs(mPendingTransactions[3]));
}

void
HerderImpl::herderOutOfSync()
{
Expand Down
9 changes: 7 additions & 2 deletions src/herder/HerderImpl.h
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
// of this distribution or at http://www.apache.org/licenses/LICENSE-2.0

#include <vector>
#include <deque>
#include <unordered_map>
#include <memory>
#include "herder/Herder.h"
Expand Down Expand Up @@ -159,8 +160,12 @@ class HerderImpl : public Herder, public SCPDriver

// 0- tx we got during ledger close
// 1- one ledger ago. rebroadcast
// 2- two ledgers ago.
std::vector<AccountTxMap> mReceivedTransactions;
// 2- two ledgers ago. rebroadcast
// ...
std::deque<AccountTxMap> mPendingTransactions;

void
updatePendingTransactions(std::vector<TransactionFramePtr> const& applied);

PendingEnvelopes mPendingEnvelopes;

Expand Down
87 changes: 56 additions & 31 deletions src/main/Config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -370,15 +370,9 @@ Config::load(std::string const& filename)
throw std::invalid_argument("invalid NODE_SEED");
}

std::string seed = item.second->as<std::string>()->value();
NODE_SEED = SecretKey::fromStrKeySeed(seed);
if (!VALIDATOR_NAMES.insert(std::make_pair(
NODE_SEED.getStrKeyPublic(),
"self")).second)
{
throw std::invalid_argument(
"`self` is a reserved name for NODE_NAMES");
}
PublicKey nodeID;
parseNodeID(item.second->as<std::string>()->value(), nodeID,
NODE_SEED, true);
}
else if (item.first == "NODE_IS_VALIDATOR")
{
Expand Down Expand Up @@ -425,24 +419,7 @@ Config::load(std::string const& filename)
}
else if (item.first == "PREFERRED_PEER_KEYS")
{
if (!item.second->is_array())
{
throw std::invalid_argument(
"PREFERRED_PEER_KEYS must be an array");
}
for (auto v : item.second->as_array()->array())
{
if (!v->as<std::string>())
{
throw std::invalid_argument(
"invalid element of PREFERRED_PEER_KEYS");
}

PublicKey nodeID;
parseNodeID(v->as<std::string>()->value(), nodeID);
PREFERRED_PEER_KEYS.push_back(
PubKeyUtils::toStrKey(nodeID));
}
// handled below
}
else if (item.first == "PREFERRED_PEERS_ONLY")
{
Expand Down Expand Up @@ -591,10 +568,37 @@ Config::load(std::string const& filename)
}
}
// process elements that potentially depend on others
auto qset = g.get("QUORUM_SET");
if (qset)
if (g.contains("PREFERRED_PEER_KEYS"))
{
auto pkeys = g.get("PREFERRED_PEER_KEYS");
if (pkeys)
{
if (!pkeys->is_array())
{
throw std::invalid_argument(
"PREFERRED_PEER_KEYS must be an array");
}
for (auto v : pkeys->as_array()->array())
{
if (!v->as<std::string>())
{
throw std::invalid_argument(
"invalid element of PREFERRED_PEER_KEYS");
}
PublicKey nodeID;
parseNodeID(v->as<std::string>()->value(), nodeID);
PREFERRED_PEER_KEYS.push_back(
PubKeyUtils::toStrKey(nodeID));
}
}
}
if (g.contains("QUORUM_SET"))
{
loadQset(qset->as_group(), QUORUM_SET, 0);
auto qset = g.get("QUORUM_SET");
if (qset)
{
loadQset(qset->as_group(), QUORUM_SET, 0);
}
}

validateConfig();
Expand Down Expand Up @@ -679,13 +683,25 @@ Config::validateConfig()

void
Config::parseNodeID(std::string configStr, PublicKey& retKey)
{
SecretKey k;
parseNodeID(configStr, retKey, k, false);
}

void
Config::parseNodeID(std::string configStr, PublicKey& retKey, SecretKey& sKey,
bool isSeed)
{
if (configStr.size() < 2)
throw std::invalid_argument("invalid key");

// check if configStr is a PublicKey or a common name
if (configStr[0] == '$')
{
if (isSeed)
{
throw std::invalid_argument("aliases only store public keys");
}
if (!resolveNodeID(configStr, retKey))
{
throw std::invalid_argument("unknown key in config");
Expand All @@ -696,7 +712,16 @@ Config::parseNodeID(std::string configStr, PublicKey& retKey)
std::istringstream iss(configStr);
std::string nodestr;
iss >> nodestr;
retKey = PubKeyUtils::fromStrKey(nodestr);
if (isSeed)
{
sKey = SecretKey::fromStrKeySeed(nodestr);
retKey = sKey.getPublicKey();
nodestr = sKey.getStrKeyPublic();
}
else
{
retKey = PubKeyUtils::fromStrKey(nodestr);
}

if (iss)
{ // get any common name they have added
Expand Down
2 changes: 2 additions & 0 deletions src/main/Config.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ class Config : public std::enable_shared_from_this<Config>
SCPQuorumSet& qset, int level);

void parseNodeID(std::string configStr, PublicKey& retKey);
void parseNodeID(std::string configStr, PublicKey& retKey, SecretKey& sKey,
bool isSeed);

public:
typedef std::shared_ptr<Config> pointer;
Expand Down

0 comments on commit 73147b7

Please sign in to comment.