Skip to content

Commit

Permalink
Merge pull request ethereum#199 from nguyenbatam/duplicate_connection…
Browse files Browse the repository at this point in the history
…_with_each_peer

duplicate connection with each peer
  • Loading branch information
ngtuna authored Oct 23, 2018
2 parents 74670cb + 740e607 commit f7c2902
Show file tree
Hide file tree
Showing 19 changed files with 358 additions and 101 deletions.
21 changes: 10 additions & 11 deletions core/tx_list.go
Original file line number Diff line number Diff line change
Expand Up @@ -251,19 +251,18 @@ func (l *txList) Overlaps(tx *types.Transaction) bool {
func (l *txList) Add(tx *types.Transaction, priceBump uint64) (bool, *types.Transaction) {
// If there's an older better transaction, abort
old := l.txs.Get(tx.Nonce())

if (tx.To() != nil && tx.To().String() != common.RandomizeSMC) || tx.To() == nil {
if old != nil {
threshold := new(big.Int).Div(new(big.Int).Mul(old.GasPrice(), big.NewInt(100+int64(priceBump))), big.NewInt(100))
// Have to ensure that the new gas price is higher than the old gas
// price as well as checking the percentage threshold to ensure that
// this is accurate for low (Wei-level) gas price replacements
if old.GasPrice().Cmp(tx.GasPrice()) >= 0 || threshold.Cmp(tx.GasPrice()) > 0 {
return false, nil
}
if old != nil && old.IsSpecialTransaction() {
return false, nil
}
if old != nil {
threshold := new(big.Int).Div(new(big.Int).Mul(old.GasPrice(), big.NewInt(100+int64(priceBump))), big.NewInt(100))
// Have to ensure that the new gas price is higher than the old gas
// price as well as checking the percentage threshold to ensure that
// this is accurate for low (Wei-level) gas price replacements
if old.GasPrice().Cmp(tx.GasPrice()) >= 0 || threshold.Cmp(tx.GasPrice()) > 0 {
return false, nil
}
}

// Otherwise overwrite the old transaction with the current one
l.txs.Put(tx)
if cost := tx.Cost(); l.costcap.Cmp(cost) < 0 {
Expand Down
86 changes: 73 additions & 13 deletions core/tx_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,8 @@ var (
ErrOversizedData = errors.New("oversized data")

ErrZeroGasPrice = errors.New("zero gas price")

ErrDuplicateSpecialTransaction = errors.New("duplicate a specail transaction")
)

var (
Expand Down Expand Up @@ -186,16 +188,17 @@ func (config *TxPoolConfig) sanitize() TxPoolConfig {
// current state) and future transactions. Transactions move between those
// two states over time as they are received and processed.
type TxPool struct {
config TxPoolConfig
chainconfig *params.ChainConfig
chain blockChain
gasPrice *big.Int
txFeed event.Feed
scope event.SubscriptionScope
chainHeadCh chan ChainHeadEvent
chainHeadSub event.Subscription
signer types.Signer
mu sync.RWMutex
config TxPoolConfig
chainconfig *params.ChainConfig
chain blockChain
gasPrice *big.Int
txFeed event.Feed
specialTxFeed event.Feed
scope event.SubscriptionScope
chainHeadCh chan ChainHeadEvent
chainHeadSub event.Subscription
signer types.Signer
mu sync.RWMutex

currentState *state.StateDB // Current state in the blockchain head
pendingState *state.ManagedState // Pending state tracking virtual nonces
Expand Down Expand Up @@ -454,6 +457,12 @@ func (pool *TxPool) SubscribeTxPreEvent(ch chan<- TxPreEvent) event.Subscription
return pool.scope.Track(pool.txFeed.Subscribe(ch))
}

// SubscribeSpecialTxPreEvent registers a subscription of TxPreEvent and
// starts sending event to the given channel.
func (pool *TxPool) SubscribeSpecialTxPreEvent(ch chan<- TxPreEvent) event.Subscription {
return pool.scope.Track(pool.specialTxFeed.Subscribe(ch))
}

// GasPrice returns the current gas price enforced by the transaction pool.
func (pool *TxPool) GasPrice() *big.Int {
pool.mu.RLock()
Expand Down Expand Up @@ -576,7 +585,7 @@ func (pool *TxPool) validateTx(tx *types.Transaction, local bool) error {
}
// Drop non-local transactions under our own minimal accepted gas price
local = local || pool.locals.contains(from) // account may be local even if the transaction arrived from the network
if !local && pool.gasPrice.Cmp(tx.GasPrice()) > 0 {
if !local && tx.To() != nil && !tx.IsSpecialTransaction() && pool.gasPrice.Cmp(tx.GasPrice()) > 0 {
return ErrUnderpriced
}
// Ensure the transaction adheres to nonce ordering
Expand All @@ -589,7 +598,7 @@ func (pool *TxPool) validateTx(tx *types.Transaction, local bool) error {
return ErrInsufficientFunds
}

if tx.To() != nil && tx.To().String() != common.BlockSigners && tx.To().String() != common.RandomizeSMC {
if tx.To() != nil && !tx.IsSpecialTransaction() {
intrGas, err := IntrinsicGas(tx.Data(), tx.To() == nil, pool.homestead)
if err != nil {
return err
Expand Down Expand Up @@ -646,8 +655,11 @@ func (pool *TxPool) add(tx *types.Transaction, local bool) (bool, error) {
pool.removeTx(tx.Hash())
}
}
// If the transaction is replacing an already pending one, do directly
from, _ := types.Sender(pool.signer, tx) // already validated
if tx.IsSpecialTransaction() {
return pool.promoteSpecialTx(from, tx)
}
// If the transaction is replacing an already pending one, do directly
if list := pool.pending[from]; list != nil && list.Overlaps(tx) {
// Nonce already pending, check if required price bump is met
inserted, old := list.Add(tx, pool.config.PriceBump)
Expand Down Expand Up @@ -763,6 +775,54 @@ func (pool *TxPool) promoteTx(addr common.Address, hash common.Hash, tx *types.T
go pool.txFeed.Send(TxPreEvent{tx})
}

func (pool *TxPool) promoteSpecialTx(addr common.Address, tx *types.Transaction) (bool, error) {
// Try to insert the transaction into the pending queue
if pool.pending[addr] == nil {
pool.pending[addr] = newTxList(true)
}
list := pool.pending[addr]
old := list.txs.Get(tx.Nonce())
if old != nil && old.IsSpecialTransaction() {
return false, ErrDuplicateSpecialTransaction
}
// Otherwise discard any previous transaction and mark this
if old != nil {
delete(pool.all, old.Hash())
pool.priced.Removed()
pendingReplaceCounter.Inc(1)
}
list.txs.Put(tx)
if cost := tx.Cost(); list.costcap.Cmp(cost) < 0 {
list.costcap = cost
}
if gas := tx.Gas(); list.gascap < gas {
list.gascap = gas
}
// Failsafe to work around direct pending inserts (tests)
if pool.all[tx.Hash()] == nil {
pool.all[tx.Hash()] = tx
}
// Set the potentially new pending nonce and notify any subsystems of the new tx
pool.beats[addr] = time.Now()
pool.pendingState.SetNonce(addr, tx.Nonce()+1)
broadcastTxs := types.Transactions{}
for i := tx.Nonce() - 1; i > 0; i-- {
before := list.txs.Get(i)
if before == nil || before.IsSpecialTransaction() {
break
}
broadcastTxs = append(broadcastTxs, before)
}
broadcastTxs = append(broadcastTxs, tx)
go func() {
for _, btx := range broadcastTxs {
pool.specialTxFeed.Send(TxPreEvent{btx})
log.Debug("Pooled new special transaction", "hash", tx.Hash(), "from", addr, "to", tx.To(), "nonce", tx.Nonce())
}
}()
return true, nil
}

// AddLocal enqueues a single transaction into the pool if it is valid, marking
// the sender as a local one in the mean time, ensuring it goes around the local
// pricing constraints.
Expand Down
41 changes: 34 additions & 7 deletions core/types/transaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -267,6 +267,13 @@ func (tx *Transaction) RawSignatureValues() (*big.Int, *big.Int, *big.Int) {
return tx.data.V, tx.data.R, tx.data.S
}

func (tx *Transaction) IsSpecialTransaction() bool {
if tx.To() == nil {
return false
}
return tx.To().String() == common.RandomizeSMC || tx.To().String() == common.BlockSigners
}

func (tx *Transaction) String() string {
var from, to string
if tx.data.V != nil {
Expand Down Expand Up @@ -395,14 +402,34 @@ type TransactionsByPriceAndNonce struct {
//
// Note, the input map is reowned so the caller should not interact any more with
// if after providing it to the constructor.
func NewTransactionsByPriceAndNonce(signer Signer, txs map[common.Address]Transactions) *TransactionsByPriceAndNonce {

// It also classifies special txs and normal txs
func NewTransactionsByPriceAndNonce(signer Signer, txs map[common.Address]Transactions) (*TransactionsByPriceAndNonce, Transactions) {
// Initialize a price based heap with the head transactions
heads := make(TxByPrice, 0, len(txs))
heads := TxByPrice{}
specialTxs := Transactions{}
for _, accTxs := range txs {
heads = append(heads, accTxs[0])
// Ensure the sender address is from the signer
acc, _ := Sender(signer, accTxs[0])
txs[acc] = accTxs[1:]
var normalTxs Transactions
lastSpecialTx := -1
for i, tx := range accTxs {
if tx.IsSpecialTransaction() {
lastSpecialTx = i
}
}
if lastSpecialTx >= 0 {
for i := 0; i <= lastSpecialTx; i++ {
specialTxs = append(specialTxs, accTxs[i])
}
normalTxs = accTxs[lastSpecialTx+1:]
} else {
normalTxs = accTxs
}
if len(normalTxs) > 0 {
acc, _ := Sender(signer, normalTxs[0])
heads = append(heads, normalTxs[0])
// Ensure the sender address is from the signer
txs[acc] = normalTxs[1:]
}
}
heap.Init(&heads)

Expand All @@ -411,7 +438,7 @@ func NewTransactionsByPriceAndNonce(signer Signer, txs map[common.Address]Transa
txs: txs,
heads: heads,
signer: signer,
}
}, specialTxs
}

// Peek returns the next transaction by price.
Expand Down
2 changes: 1 addition & 1 deletion core/types/transaction_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ func TestTransactionPriceNonceSort(t *testing.T) {
}
}
// Sort the transactions and cross check the nonce ordering
txset := NewTransactionsByPriceAndNonce(signer, groups)
txset, _ := NewTransactionsByPriceAndNonce(signer, groups)

txs := Transactions{}
for tx := txset.Peek(); tx != nil; tx = txset.Peek() {
Expand Down
1 change: 0 additions & 1 deletion eth/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -282,7 +282,6 @@ func New(ctx *node.ServiceContext, config *Config) (*Ethereum, error) {
}
return penSigners, nil
}

return []common.Address{}, nil
}

Expand Down
83 changes: 58 additions & 25 deletions eth/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,8 @@ type ProtocolManager struct {
eventMux *event.TypeMux
txCh chan core.TxPreEvent
txSub event.Subscription
specialTxCh chan core.TxPreEvent
specialTxSub event.Subscription
minedBlockSub *event.TypeMuxSubscription

// channels for fetcher, syncer, txsyncLoop
Expand Down Expand Up @@ -208,6 +210,11 @@ func (pm *ProtocolManager) Start(maxPeers int) {
pm.txSub = pm.txpool.SubscribeTxPreEvent(pm.txCh)
go pm.txBroadcastLoop()

// broadcast special transactions
pm.specialTxCh = make(chan core.TxPreEvent, txChanSize)
pm.specialTxSub = pm.txpool.SubscribeSpecialTxPreEvent(pm.specialTxCh)
go pm.specialTxBroadcastLoop()

// broadcast mined blocks
pm.minedBlockSub = pm.eventMux.Subscribe(core.NewMinedBlockEvent{})
go pm.minedBroadcastLoop()
Expand All @@ -221,6 +228,7 @@ func (pm *ProtocolManager) Stop() {
log.Info("Stopping Ethereum protocol")

pm.txSub.Unsubscribe() // quits txBroadcastLoop
pm.specialTxSub.Unsubscribe() // quits specialTxBroadcastLoop
pm.minedBlockSub.Unsubscribe() // quits blockBroadcastLoop

// Quit the sync loop.
Expand Down Expand Up @@ -271,38 +279,40 @@ func (pm *ProtocolManager) handle(p *peer) error {
rw.Init(p.version)
}
// Register the peer locally
if err := pm.peers.Register(p); err != nil {
err := pm.peers.Register(p)
if err != nil && err != p2p.ErrAddPairPeer {
p.Log().Error("Ethereum peer registration failed", "err", err)
return err
}
defer pm.removePeer(p.id)

// Register the peer in the downloader. If the downloader considers it banned, we disconnect
if err := pm.downloader.RegisterPeer(p.id, p.version, p); err != nil {
return err
}
// Propagate existing transactions. new transactions appearing
// after this will be sent via broadcasts.
pm.syncTransactions(p)

// If we're DAO hard-fork aware, validate any remote peer with regard to the hard-fork
if daoBlock := pm.chainconfig.DAOForkBlock; daoBlock != nil {
// Request the peer's DAO fork header for extra-data validation
if err := p.RequestHeadersByNumber(daoBlock.Uint64(), 1, 0, false); err != nil {
if err != p2p.ErrAddPairPeer {
// Register the peer in the downloader. If the downloader considers it banned, we disconnect
if err := pm.downloader.RegisterPeer(p.id, p.version, p); err != nil {
return err
}
// Start a timer to disconnect if the peer doesn't reply in time
p.forkDrop = time.AfterFunc(daoChallengeTimeout, func() {
p.Log().Debug("Timed out DAO fork-check, dropping")
pm.removePeer(p.id)
})
// Make sure it's cleaned up if the peer dies off
defer func() {
if p.forkDrop != nil {
p.forkDrop.Stop()
p.forkDrop = nil
// Propagate existing transactions. new transactions appearing
// after this will be sent via broadcasts.
pm.syncTransactions(p)

// If we're DAO hard-fork aware, validate any remote peer with regard to the hard-fork
if daoBlock := pm.chainconfig.DAOForkBlock; daoBlock != nil {
// Request the peer's DAO fork header for extra-data validation
if err := p.RequestHeadersByNumber(daoBlock.Uint64(), 1, 0, false); err != nil {
return err
}
}()
// Start a timer to disconnect if the peer doesn't reply in time
p.forkDrop = time.AfterFunc(daoChallengeTimeout, func() {
p.Log().Debug("Timed out DAO fork-check, dropping")
pm.removePeer(p.id)
})
// Make sure it's cleaned up if the peer dies off
defer func() {
if p.forkDrop != nil {
p.forkDrop.Stop()
p.forkDrop = nil
}
}()
}
}
// main loop. handle incoming messages.
for {
Expand Down Expand Up @@ -724,6 +734,16 @@ func (pm *ProtocolManager) BroadcastTx(hash common.Hash, tx *types.Transaction)
log.Trace("Broadcast transaction", "hash", hash, "recipients", len(peers))
}

func (pm *ProtocolManager) BroadcastSpecialTx(hash common.Hash, tx *types.Transaction) {
// Broadcast transaction to a batch of peers not knowing about it
peers := pm.peers.PeersWithoutTx(hash)
//FIXME include this again: peers = peers[:int(math.Sqrt(float64(len(peers))))]
for _, peer := range peers {
peer.SendSpecialTransactions(tx)
}
log.Debug("Broadcast special transaction", "hash", hash, "recipients", len(peers))
}

// Mined broadcast loop
func (self *ProtocolManager) minedBroadcastLoop() {
// automatically stops if unsubscribe
Expand All @@ -749,6 +769,19 @@ func (self *ProtocolManager) txBroadcastLoop() {
}
}

func (self *ProtocolManager) specialTxBroadcastLoop() {
for {
select {
case event := <-self.specialTxCh:
self.BroadcastSpecialTx(event.Tx.Hash(), event.Tx)

// Err() channel will be closed when unsubscribing.
case <-self.specialTxSub.Err():
return
}
}
}

// NodeInfo represents a short summary of the Ethereum sub-protocol metadata
// known about the host peer.
type NodeInfo struct {
Expand Down
4 changes: 4 additions & 0 deletions eth/helper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,10 @@ func (p *testTxPool) SubscribeTxPreEvent(ch chan<- core.TxPreEvent) event.Subscr
return p.txFeed.Subscribe(ch)
}

func (p *testTxPool) SubscribeSpecialTxPreEvent(ch chan<- core.TxPreEvent) event.Subscription {
return p.txFeed.Subscribe(ch)
}

// newTestTransaction create a new dummy transaction.
func newTestTransaction(from *ecdsa.PrivateKey, nonce uint64, datasize int) *types.Transaction {
tx := types.NewTransaction(nonce, common.Address{}, big.NewInt(0), 100000, big.NewInt(0), make([]byte, datasize))
Expand Down
Loading

0 comments on commit f7c2902

Please sign in to comment.