Skip to content

Commit

Permalink
MAT-865: Add subscribe deposit event filter (ethereum#21)
Browse files Browse the repository at this point in the history
* add eth_depositById rpc method

* add sample depositById test

* change ethereum to bor

* add subscription for new deposit data

* create channel to listen new state change

* push state data to subcribed channel

* apply filter on deposit events sub

* remove unused methods

* Fix: no filter

* Remove unused method

* revert changes

* Fix: RPC port

* refactor and cleanup

* Fix: SubscribeStateEvent

* remove unused import

* Resolve comments

Co-authored-by: Arpit Agarwal <[email protected]>
  • Loading branch information
ptsayli and atvanguard authored Apr 6, 2020
1 parent c6881be commit 260243c
Show file tree
Hide file tree
Showing 13 changed files with 158 additions and 28 deletions.
20 changes: 20 additions & 0 deletions consensus/bor/bor.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"github.com/maticnetwork/bor/core/vm"
"github.com/maticnetwork/bor/crypto"
"github.com/maticnetwork/bor/ethdb"
"github.com/maticnetwork/bor/event"
"github.com/maticnetwork/bor/internal/ethapi"
"github.com/maticnetwork/bor/log"
"github.com/maticnetwork/bor/params"
Expand Down Expand Up @@ -245,6 +246,8 @@ type Bor struct {
stateReceiverABI abi.ABI
HeimdallClient IHeimdallClient

stateDataFeed event.Feed
scope event.SubscriptionScope
// The fields below are for testing only
fakeDiff bool // Skip difficulty verifications
}
Expand Down Expand Up @@ -1158,6 +1161,7 @@ func (c *Bor) CommitStates(
header *types.Header,
chain core.ChainContext,
) error {
fmt.Println("comminting state")
// get pending state proposals
stateIds, err := c.GetPendingStateProposals(header.Number.Uint64() - 1)
if err != nil {
Expand Down Expand Up @@ -1201,6 +1205,16 @@ func (c *Bor) CommitStates(
"txHash", eventRecord.TxHash,
"chainID", eventRecord.ChainID,
)
stateData := types.StateData{
Did: eventRecord.ID,
Contract: eventRecord.Contract,
Data: hex.EncodeToString(eventRecord.Data),
TxHash: eventRecord.TxHash,
}

go func() {
c.stateDataFeed.Send(core.NewStateChangeEvent{StateData: &stateData})
}()

recordBytes, err := rlp.EncodeToBytes(eventRecord)
if err != nil {
Expand All @@ -1226,6 +1240,12 @@ func (c *Bor) CommitStates(
return nil
}


// SubscribeStateEvent registers a subscription of ChainSideEvent.
func (c *Bor) SubscribeStateEvent(ch chan<- core.NewStateChangeEvent) event.Subscription {
return c.scope.Track(c.stateDataFeed.Subscribe(ch))
}

func (c *Bor) SetHeimdallClient(h IHeimdallClient) {
c.HeimdallClient = h
}
Expand Down
4 changes: 4 additions & 0 deletions core/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,10 @@ type ChainEvent struct {
Logs []*types.Log
}

type NewStateChangeEvent struct {
StateData *types.StateData
}

type ChainSideEvent struct {
Block *types.Block
}
Expand Down
12 changes: 12 additions & 0 deletions core/types/transaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,14 @@ type txdata struct {
Hash *common.Hash `json:"hash" rlp:"-"`
}

// State represents state received from Ethereum Blockchain
type StateData struct {
Did uint64
Contract common.Address
Data string
TxHash common.Hash
}

type txdataMarshaling struct {
AccountNonce hexutil.Uint64
Price *hexutil.Big
Expand All @@ -71,6 +79,10 @@ type txdataMarshaling struct {
S *hexutil.Big
}

func (sd *StateData) StateData() *StateData {
return sd
}

func NewTransaction(nonce uint64, to common.Address, amount *big.Int, gasLimit uint64, gasPrice *big.Int, data []byte) *Transaction {
return newTransaction(nonce, &to, amount, gasLimit, gasPrice, data)
}
Expand Down
6 changes: 6 additions & 0 deletions eth/api_backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/maticnetwork/bor/accounts"
"github.com/maticnetwork/bor/common"
"github.com/maticnetwork/bor/common/math"
"github.com/maticnetwork/bor/consensus/bor"
"github.com/maticnetwork/bor/core"
"github.com/maticnetwork/bor/core/bloombits"
"github.com/maticnetwork/bor/core/rawdb"
Expand Down Expand Up @@ -155,6 +156,11 @@ func (b *EthAPIBackend) SubscribeChainSideEvent(ch chan<- core.ChainSideEvent) e
return b.eth.BlockChain().SubscribeChainSideEvent(ch)
}

func (b *EthAPIBackend) SubscribeStateEvent(ch chan<- core.NewStateChangeEvent) event.Subscription {
engine := b.eth.Engine()
return engine.(*bor.Bor).SubscribeStateEvent(ch)
}

func (b *EthAPIBackend) SubscribeLogsEvent(ch chan<- []*types.Log) event.Subscription {
return b.eth.BlockChain().SubscribeLogsEvent(ch)
}
Expand Down
33 changes: 32 additions & 1 deletion eth/filters/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -215,7 +215,6 @@ func (api *PublicFilterAPI) NewHeads(ctx context.Context) (*rpc.Subscription, er
go func() {
headers := make(chan *types.Header)
headersSub := api.events.SubscribeNewHeads(headers)

for {
select {
case h := <-headers:
Expand All @@ -233,6 +232,38 @@ func (api *PublicFilterAPI) NewHeads(ctx context.Context) (*rpc.Subscription, er
return rpcSub, nil
}

// NewDeposits send a notification each time a new deposit received from bridge.
func (api *PublicFilterAPI) NewDeposits(ctx context.Context, crit ethereum.FilterState) (*rpc.Subscription, error) {
notifier, supported := rpc.NotifierFromContext(ctx)
if !supported {
return &rpc.Subscription{}, rpc.ErrNotificationsUnsupported
}

rpcSub := notifier.CreateSubscription()
go func() {
stateData := make(chan *types.StateData)
stateDataSub := api.events.SubscribeNewDeposits(stateData)

for {
select {
case h := <-stateData:
if crit.Did == h.Did || crit.Contract == h.Contract ||
(crit.Did == 0 && crit.Contract == common.Address{}) {
notifier.Notify(rpcSub.ID, h)
}
case <-rpcSub.Err():
stateDataSub.Unsubscribe()
return
case <-notifier.Closed():
stateDataSub.Unsubscribe()
return
}
}
}()

return rpcSub, nil
}

// Logs creates a subscription that fires for all new log that match the given filter criteria.
func (api *PublicFilterAPI) Logs(ctx context.Context, crit FilterCriteria) (*rpc.Subscription, error) {
notifier, supported := rpc.NotifierFromContext(ctx)
Expand Down
1 change: 1 addition & 0 deletions eth/filters/filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ type Backend interface {

SubscribeNewTxsEvent(chan<- core.NewTxsEvent) event.Subscription
SubscribeChainEvent(ch chan<- core.ChainEvent) event.Subscription
SubscribeStateEvent(ch chan<- core.NewStateChangeEvent) event.Subscription
SubscribeRemovedLogsEvent(ch chan<- core.RemovedLogsEvent) event.Subscription
SubscribeLogsEvent(ch chan<- []*types.Log) event.Subscription

Expand Down
50 changes: 43 additions & 7 deletions eth/filters/filter_system.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,10 @@ const (
PendingTransactionsSubscription
// BlocksSubscription queries hashes for blocks that are imported
BlocksSubscription

//StateSubscription to listen main chain state
StateSubscription

// LastSubscription keeps track of the last index
LastIndexSubscription
)
Expand All @@ -68,6 +72,8 @@ const (
logsChanSize = 10
// chainEvChanSize is the size of channel listening to ChainEvent.
chainEvChanSize = 10
// stateEvChanSize is the size of channel listening to ChainEvent.
stateEvChanSize = 10
)

var (
Expand All @@ -82,6 +88,7 @@ type subscription struct {
logs chan []*types.Log
hashes chan []common.Hash
headers chan *types.Header
stateData chan *types.StateData
installed chan struct{} // closed when the filter is installed
err chan error // closed when the filter is uninstalled
}
Expand All @@ -99,15 +106,18 @@ type EventSystem struct {
logsSub event.Subscription // Subscription for new log event
rmLogsSub event.Subscription // Subscription for removed log event
chainSub event.Subscription // Subscription for new chain event
stateSub event.Subscription // Subscription for new state change event
pendingLogSub *event.TypeMuxSubscription // Subscription for pending log event

// Channels
install chan *subscription // install filter for event notification
uninstall chan *subscription // remove filter for event notification
txsCh chan core.NewTxsEvent // Channel to receive new transactions event
logsCh chan []*types.Log // Channel to receive new log event
rmLogsCh chan core.RemovedLogsEvent // Channel to receive removed log event
chainCh chan core.ChainEvent // Channel to receive new chain event
install chan *subscription // install filter for event notification
uninstall chan *subscription // remove filter for event notification
txsCh chan core.NewTxsEvent // Channel to receive new transactions event
logsCh chan []*types.Log // Channel to receive new log event
rmLogsCh chan core.RemovedLogsEvent // Channel to receive removed log event
chainCh chan core.ChainEvent // Channel to receive new chain event
stateCh chan core.NewStateChangeEvent // Channel to receive deposit state change event

}

// NewEventSystem creates a new manager that listens for event on the given mux,
Expand All @@ -127,19 +137,21 @@ func NewEventSystem(mux *event.TypeMux, backend Backend, lightMode bool) *EventS
logsCh: make(chan []*types.Log, logsChanSize),
rmLogsCh: make(chan core.RemovedLogsEvent, rmLogsChanSize),
chainCh: make(chan core.ChainEvent, chainEvChanSize),
stateCh: make(chan core.NewStateChangeEvent, stateEvChanSize),
}

// Subscribe events
m.txsSub = m.backend.SubscribeNewTxsEvent(m.txsCh)
m.logsSub = m.backend.SubscribeLogsEvent(m.logsCh)
m.rmLogsSub = m.backend.SubscribeRemovedLogsEvent(m.rmLogsCh)
m.chainSub = m.backend.SubscribeChainEvent(m.chainCh)
m.stateSub = m.backend.SubscribeStateEvent(m.stateCh)
// TODO(rjl493456442): use feed to subscribe pending log event
m.pendingLogSub = m.mux.Subscribe(core.PendingLogsEvent{})

// Make sure none of the subscriptions are empty
if m.txsSub == nil || m.logsSub == nil || m.rmLogsSub == nil || m.chainSub == nil ||
m.pendingLogSub.Closed() {
m.stateSub == nil || m.pendingLogSub.Closed() {
log.Crit("Subscribe for event system failed")
}

Expand Down Expand Up @@ -292,6 +304,24 @@ func (es *EventSystem) SubscribeNewHeads(headers chan *types.Header) *Subscripti
logs: make(chan []*types.Log),
hashes: make(chan []common.Hash),
headers: headers,
stateData: make(chan *types.StateData),
installed: make(chan struct{}),
err: make(chan error),
}
return es.subscribe(sub)
}

// SubscribeNewHeads creates a subscription that writes the header of a block that is
// imported in the chain.
func (es *EventSystem) SubscribeNewDeposits(stateData chan *types.StateData) *Subscription {
sub := &subscription{
id: rpc.NewID(),
typ: StateSubscription,
created: time.Now(),
logs: make(chan []*types.Log),
hashes: make(chan []common.Hash),
headers: make(chan *types.Header),
stateData: stateData,
installed: make(chan struct{}),
err: make(chan error),
}
Expand Down Expand Up @@ -355,6 +385,10 @@ func (es *EventSystem) broadcast(filters filterIndex, ev interface{}) {
for _, f := range filters[PendingTransactionsSubscription] {
f.hashes <- hashes
}
case core.NewStateChangeEvent:
for _, f := range filters[StateSubscription] {
f.stateData <- e.StateData.StateData()
}
case core.ChainEvent:
for _, f := range filters[BlocksSubscription] {
f.headers <- e.Block.Header()
Expand Down Expand Up @@ -471,6 +505,8 @@ func (es *EventSystem) eventLoop() {
es.broadcast(index, ev)
case ev := <-es.chainCh:
es.broadcast(index, ev)
case ev := <-es.stateCh:
es.broadcast(index, ev)
case ev, active := <-es.pendingLogSub.Chan():
if !active { // system stopped
return
Expand Down
8 changes: 7 additions & 1 deletion ethclient/ethclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import (
"fmt"
"math/big"

"github.com/maticnetwork/bor"
ethereum "github.com/maticnetwork/bor"
"github.com/maticnetwork/bor/common"
"github.com/maticnetwork/bor/common/hexutil"
"github.com/maticnetwork/bor/core/types"
Expand Down Expand Up @@ -324,6 +324,12 @@ func (ec *Client) SubscribeNewHead(ctx context.Context, ch chan<- *types.Header)
return ec.c.EthSubscribe(ctx, ch, "newHeads")
}

// SubscribeNewHead subscribes to notifications about the current blockchain head
// on the given channel.
func (ec *Client) SubscribeNewDeposit(ctx context.Context, ch chan<- *types.Header) (ethereum.Subscription, error) {
return ec.c.EthSubscribe(ctx, ch, "newDeposits")
}

// State Access

// NetworkID returns the network ID (also known as the chain ID) for this chain.
Expand Down
Loading

0 comments on commit 260243c

Please sign in to comment.