Skip to content

Commit

Permalink
Update flips distribution (#84)
Browse files Browse the repository at this point in the history
  • Loading branch information
sidenaio authored Aug 21, 2019
1 parent bb32fa2 commit 6c438e4
Show file tree
Hide file tree
Showing 6 changed files with 67 additions and 33 deletions.
17 changes: 3 additions & 14 deletions api/flip_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,27 +10,21 @@ import (
"github.com/idena-network/idena-go/core/flip"
"github.com/idena-network/idena-go/core/state"
"github.com/idena-network/idena-go/ipfs"
"github.com/idena-network/idena-go/protocol"
"github.com/ipfs/go-cid"
"github.com/pkg/errors"
"github.com/shopspring/decimal"
)

const (
MaxFlipSize = 1024 * 600
)

type FlipApi struct {
baseApi *BaseApi
fp *flip.Flipper
pm *protocol.ProtocolManager
ipfsProxy ipfs.Proxy
ceremony *ceremony.ValidationCeremony
}

// NewFlipApi creates a new FlipApi instance
func NewFlipApi(baseApi *BaseApi, fp *flip.Flipper, pm *protocol.ProtocolManager, ipfsProxy ipfs.Proxy, ceremony *ceremony.ValidationCeremony) *FlipApi {
return &FlipApi{baseApi, fp, pm, ipfsProxy, ceremony}
func NewFlipApi(baseApi *BaseApi, fp *flip.Flipper, ipfsProxy ipfs.Proxy, ceremony *ceremony.ValidationCeremony) *FlipApi {
return &FlipApi{baseApi, fp, ipfsProxy, ceremony}
}

type FlipSubmitResponse struct {
Expand All @@ -44,6 +38,7 @@ type FlipSubmitArgs struct {
}

func (api *FlipApi) Submit(i *json.RawMessage) (FlipSubmitResponse, error) {

//TODO: remove this after desktop updating
// temp code start
args := &FlipSubmitArgs{}
Expand All @@ -64,10 +59,6 @@ func (api *FlipApi) Submit(i *json.RawMessage) (FlipSubmitResponse, error) {

rawFlip := *args.Hex

if len(rawFlip) > MaxFlipSize {
return FlipSubmitResponse{}, errors.Errorf("flip is too big, max expected size %v, actual %v", MaxFlipSize, len(rawFlip))
}

cid, encryptedFlip, err := api.fp.PrepareFlip(rawFlip, args.Pair)

if err != nil {
Expand All @@ -92,8 +83,6 @@ func (api *FlipApi) Submit(i *json.RawMessage) (FlipSubmitResponse, error) {
return FlipSubmitResponse{}, err
}

api.pm.BroadcastFlip(&flip)

return FlipSubmitResponse{
TxHash: tx.Hash(),
Hash: cid.String(),
Expand Down
31 changes: 27 additions & 4 deletions core/flip/flipper.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,13 @@ import (
"crypto/rand"
"github.com/idena-network/idena-go/blockchain/types"
"github.com/idena-network/idena-go/common"
"github.com/idena-network/idena-go/common/eventbus"
"github.com/idena-network/idena-go/core/appstate"
"github.com/idena-network/idena-go/core/mempool"
"github.com/idena-network/idena-go/crypto"
"github.com/idena-network/idena-go/crypto/ecies"
"github.com/idena-network/idena-go/database"
"github.com/idena-network/idena-go/events"
"github.com/idena-network/idena-go/ipfs"
"github.com/idena-network/idena-go/log"
"github.com/idena-network/idena-go/rlp"
Expand All @@ -22,6 +24,10 @@ import (
"sync"
)

const (
MaxFlipSize = 1024 * 600
)

type Flipper struct {
epochDb *database.EpochDb
db dbm.DB
Expand All @@ -37,14 +43,15 @@ type Flipper struct {
txpool *mempool.TxPool
loadingCtx context.Context
cancelLoadingCtx context.CancelFunc
bus eventbus.Bus
}
type IpfsFlip struct {
Data []byte
PubKey []byte
Pair uint8
}

func NewFlipper(db dbm.DB, ipfsProxy ipfs.Proxy, keyspool *mempool.KeysPool, txpool *mempool.TxPool, secStore *secstore.SecStore, appState *appstate.AppState) *Flipper {
func NewFlipper(db dbm.DB, ipfsProxy ipfs.Proxy, keyspool *mempool.KeysPool, txpool *mempool.TxPool, secStore *secstore.SecStore, appState *appstate.AppState, bus eventbus.Bus) *Flipper {
ctx, cancel := context.WithCancel(context.Background())
fp := &Flipper{
db: db,
Expand All @@ -58,6 +65,7 @@ func NewFlipper(db dbm.DB, ipfsProxy ipfs.Proxy, keyspool *mempool.KeysPool, txp
appState: appState,
loadingCtx: ctx,
cancelLoadingCtx: cancel,
bus: bus,
}

return fp
Expand All @@ -83,6 +91,10 @@ func (fp *Flipper) AddNewFlip(flip types.Flip, local bool) error {

data, _ := rlp.EncodeToBytes(ipf)

if len(data) > MaxFlipSize {
return errors.Errorf("flip is too big, max expected size %v, actual %v", MaxFlipSize, len(data))
}

c, err := fp.ipfsProxy.Cid(data)

if err != nil {
Expand All @@ -97,18 +109,29 @@ func (fp *Flipper) AddNewFlip(flip types.Flip, local bool) error {
return errors.Errorf("tx cid and flip cid mismatch, tx: %v", flip.Tx.Hash())
}

if err := fp.txpool.Add(flip.Tx); err != nil && err != mempool.DuplicateTxError {
if err := fp.txpool.Validate(flip.Tx); err != nil && err != mempool.DuplicateTxError {
log.Warn("Flip Tx is not valid", "hash", flip.Tx.Hash().Hex(), "err", err)
return err
}

fp.bus.Publish(&events.NewFlipEvent{Flip: &flip})

key, err := fp.ipfsProxy.Add(data)

if err != nil {
return err
}
fp.epochDb.WriteFlipCid(c.Bytes())

if local {
fp.ipfsProxy.Pin(key.Bytes())
if err := fp.ipfsProxy.Pin(key.Bytes()); err != nil {
return err
}
}

fp.epochDb.WriteFlipCid(c.Bytes())
if err := fp.txpool.Add(flip.Tx); err != nil && err != mempool.DuplicateTxError {
return err
}

return err
}
Expand Down
31 changes: 21 additions & 10 deletions core/mempool/txpool.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,15 +76,8 @@ func (txpool *TxPool) addDeferredTx(tx *types.Transaction) {
txpool.knownDeferredTxs.Add(tx.Hash())
}

func (txpool *TxPool) Add(tx *types.Transaction) error {

txpool.mutex.Lock()
defer txpool.mutex.Unlock()
func (txpool *TxPool) Validate(tx *types.Transaction) error {

if txpool.isSyncing {
txpool.addDeferredTx(tx)
return nil
}
if err := txpool.checkTotalTxLimit(); err != nil {
return err
}
Expand All @@ -107,11 +100,29 @@ func (txpool *TxPool) Add(tx *types.Transaction) error {

appState := txpool.appState.Readonly(txpool.head.Height())

if err := validation.ValidateTx(appState, tx, true); err != nil {
log.Warn("Tx is not valid", "hash", tx.Hash().Hex(), "err", err)
return validation.ValidateTx(appState, tx, true)
}

func (txpool *TxPool) Add(tx *types.Transaction) error {

txpool.mutex.Lock()
defer txpool.mutex.Unlock()

if txpool.isSyncing {
txpool.addDeferredTx(tx)
return nil
}

if err := txpool.Validate(tx); err != nil {
if err != DuplicateTxError {
log.Warn("Tx is not valid", "hash", tx.Hash().Hex(), "err", err)
}
return err
}

hash := tx.Hash()
sender, _ := types.Sender(tx)

txpool.pending[hash] = tx
senderPending := txpool.pendingPerAddr[sender]
if senderPending == nil {
Expand Down
9 changes: 9 additions & 0 deletions events/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ const (
AddBlockEventID = eventbus.EventID("block-add")
NewFlipKeyID = eventbus.EventID("flip-key-new")
FastSyncCompleted = eventbus.EventID("fast-sync-completed")
NewFlipEventID = eventbus.EventID("flip-new")
)

type NewTxEvent struct {
Expand Down Expand Up @@ -42,3 +43,11 @@ type FastSyncCompletedEvent struct {
func (FastSyncCompletedEvent) EventID() eventbus.EventID {
return FastSyncCompleted
}

type NewFlipEvent struct {
Flip *types.Flip
}

func (NewFlipEvent) EventID() eventbus.EventID {
return NewFlipEventID
}
4 changes: 2 additions & 2 deletions node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ func NewNode(config *config.Config) (*Node, error) {
offlineDetector := blockchain.NewOfflineDetector(config.OfflineDetection, db, appState, secStore, bus)
chain := blockchain.NewBlockchain(config, db, txpool, appState, ipfsProxy, secStore, bus, offlineDetector)
proposals := pengings.NewProposals(chain, offlineDetector)
flipper := flip.NewFlipper(db, ipfsProxy, flipKeyPool, txpool, secStore, appState)
flipper := flip.NewFlipper(db, ipfsProxy, flipKeyPool, txpool, secStore, appState, bus)
pm := protocol.NetProtocolManager(chain, proposals, votes, txpool, flipper, bus, flipKeyPool, config.P2P)
sm := state.NewSnapshotManager(db, appState.State, bus, ipfsProxy, config)
downloader := protocol.NewDownloader(pm, config, chain, ipfsProxy, appState, sm, bus, secStore)
Expand Down Expand Up @@ -266,7 +266,7 @@ func (node *Node) apis() []rpc.API {
{
Namespace: "flip",
Version: "1.0",
Service: api.NewFlipApi(baseApi, node.fp, node.pm, node.ipfsProxy, node.ceremony),
Service: api.NewFlipApi(baseApi, node.fp, node.ipfsProxy, node.ceremony),
Public: true,
},
{
Expand Down
8 changes: 5 additions & 3 deletions protocol/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,10 @@ func (pm *ProtocolManager) Start() {
newFlipKeyEvent := e.(*events.NewFlipKeyEvent)
pm.flipKeyChan <- newFlipKeyEvent.Key
})
_ = pm.bus.Subscribe(events.NewFlipEventID, func(e eventbus.Event) {
newFlipEvent := e.(*events.NewFlipEvent)
pm.broadcastFlip(newFlipEvent.Flip)
})

go pm.broadcastLoop()
}
Expand Down Expand Up @@ -222,8 +226,6 @@ func (pm *ProtocolManager) handle(p *peer) error {
p.markFlip(&flip)
if err := pm.flipper.AddNewFlip(flip, false); err != nil {
p.Log().Debug("invalid flip", "err", err)
} else {
pm.BroadcastFlip(&flip)
}
case FlipKey:
var flipKey types.FlipKey
Expand Down Expand Up @@ -421,7 +423,7 @@ func (pm *ProtocolManager) BroadcastTx(transaction *types.Transaction) {
}
}

func (pm *ProtocolManager) BroadcastFlip(flip *types.Flip) {
func (pm *ProtocolManager) broadcastFlip(flip *types.Flip) {
for _, peer := range pm.peers.PeersWithoutFlip(flip.Tx.Hash()) {
peer.SendFlipAsync(flip)
}
Expand Down

0 comments on commit 6c438e4

Please sign in to comment.