Skip to content

Commit

Permalink
Merge pull request ethereum#15 from rauljordan/txpool
Browse files Browse the repository at this point in the history
Subscribe to block headers from running geth node to propose collations
  • Loading branch information
rauljordan authored Feb 7, 2018
2 parents 11dc61f + 9f11c36 commit 5b27607
Show file tree
Hide file tree
Showing 8 changed files with 317 additions and 653 deletions.
2 changes: 1 addition & 1 deletion cmd/geth/shardingcmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ var (
Flags: []cli.Flag{utils.DataDirFlag, utils.PasswordFileFlag, utils.NetworkIdFlag},
Category: "SHARDING COMMANDS",
Description: `
The Geth sharding client connects to a running geth node in sharding mode. This feature is a work in progress.
Launches a sharding client that connects to a running geth node and proposes collations to a Validator Manager Contract. This feature is a work in progress.
`,
}
)
Expand Down
79 changes: 71 additions & 8 deletions sharding/client.go
Original file line number Diff line number Diff line change
@@ -1,13 +1,18 @@
package sharding

import (
"context"
"fmt"
"io/ioutil"
"math/big"
"strings"

"github.com/ethereum/go-ethereum/accounts"
"github.com/ethereum/go-ethereum/accounts/abi/bind"
"github.com/ethereum/go-ethereum/accounts/keystore"
"github.com/ethereum/go-ethereum/cmd/utils"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/ethclient"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/node"
Expand Down Expand Up @@ -72,22 +77,34 @@ func (c *Client) Start() error {
return err
}

// TODO: Wait to be selected as collator in goroutine?
// Deposit 100ETH into the validator set in the VMC. Checks if account
// is already a validator in the VMC (in the case the client restarted).
// Once that's done we can subscribe to block headers.
//
// TODO: this function should store the validator's VMC index as a property
// in the client's struct
if err := joinValidatorSet(c); err != nil {
return err
}

// Listens to block headers from the geth node and if we are an eligible
// proposer, we fetch pending transactions and propose a collation
if err := subscribeBlockHeaders(c); err != nil {
return err
}
return nil
}

// Wait until sharding client is shutdown.
func (c *Client) Wait() {
// TODO: Blocking lock.
log.Info("Sharding client has been shutdown...")
}

// dialRPC endpoint to node.
func dialRPC(endpoint string) (*rpc.Client, error) {
if endpoint == "" {
endpoint = node.DefaultIPCEndpoint(clientIdentifier)
}
return rpc.Dial(endpoint)
// WatchCollationHeaders checks the logs for add_header func calls
// and updates the head collation of the client. We can probably store
// this as a property of the client struct
func (c *Client) WatchCollationHeaders() {

}

// UnlockAccount will unlock the specified account using utils.PasswordFileFlag or empty string if unset.
Expand All @@ -105,3 +122,49 @@ func (c *Client) unlockAccount(account accounts.Account) error {

return c.keystore.Unlock(account, pass)
}

func (c *Client) createTXOps(value *big.Int) (bind.TransactOpts, error) {

accounts := c.keystore.Accounts()
if len(accounts) == 0 {
return bind.TransactOpts{}, fmt.Errorf("no accounts found")
}

if err := c.unlockAccount(accounts[0]); err != nil {
return bind.TransactOpts{}, fmt.Errorf("unable to unlock account 0: %v", err)
}

if value.Cmp(big.NewInt(0)) == 0 {
return bind.TransactOpts{
From: accounts[0].Address,
Signer: func(signer types.Signer, addr common.Address, tx *types.Transaction) (*types.Transaction, error) {
networkID, err := c.client.NetworkID(context.Background())
if err != nil {
return nil, fmt.Errorf("unable to fetch networkID: %v", err)
}
return c.keystore.SignTx(accounts[0], tx, networkID /* chainID */)
},
}, nil
}

return bind.TransactOpts{
From: accounts[0].Address,
Value: value,
Signer: func(signer types.Signer, addr common.Address, tx *types.Transaction) (*types.Transaction, error) {
networkID, err := c.client.NetworkID(context.Background())
if err != nil {
return nil, fmt.Errorf("unable to fetch networkID: %v", err)
}
return c.keystore.SignTx(accounts[0], tx, networkID /* chainID */)
},
}, nil

}

// dialRPC endpoint to node.
func dialRPC(endpoint string) (*rpc.Client, error) {
if endpoint == "" {
endpoint = node.DefaultIPCEndpoint(clientIdentifier)
}
return rpc.Dial(endpoint)
}
105 changes: 105 additions & 0 deletions sharding/collator.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
package sharding

import (
"context"
"fmt"
"github.com/ethereum/go-ethereum/accounts/abi/bind"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/log"
"math/big"
)

// SubscribeBlockHeaders checks incoming block headers and determines if
// we are an eligible proposer for collations. Then, it finds the pending tx's
// from the running geth node and sorts them by descending order of gas price,
// eliminates those that ask for too much gas, and routes them over
// to the VMC to create a collation
func subscribeBlockHeaders(c *Client) error {
headerChan := make(chan *types.Header, 16)

_, err := c.client.SubscribeNewHead(context.Background(), headerChan)
if err != nil {
return fmt.Errorf("unable to subscribe to incoming headers. %v", err)
}

log.Info("Listening for new headers...")

for {
// TODO: Error handling for getting disconnected from the client
select {
case head := <-headerChan:
// Query the current state to see if we are an eligible proposer
log.Info(fmt.Sprintf("Received new header: %v", head.Number.String()))
// TODO: Only run this code on certain periods?
if err := checkShardsForProposal(c, head); err != nil {
return fmt.Errorf("unable to watch shards. %v", err)
}
}
}
}

// checkShardsForProposal checks if we are an eligible proposer for
// collation for the available shards in the VMC. The function calls
// getEligibleProposer from the VMC and proposes a collation if
// conditions are met
func checkShardsForProposal(c *Client, head *types.Header) error {

accounts := c.keystore.Accounts()
if len(accounts) == 0 {
return fmt.Errorf("no accounts found")
}

if err := c.unlockAccount(accounts[0]); err != nil {
return fmt.Errorf("cannot unlock account. %v", err)
}

log.Info("Checking if we are an eligible collation proposer for a shard...")
for s := int64(0); s < shardCount; s++ {
// Checks if we are an eligible proposer according to the VMC
period := head.Number.Div(head.Number, big.NewInt(periodLength))
addr, err := c.vmc.VMCCaller.GetEligibleProposer(&bind.CallOpts{}, big.NewInt(s), period)
// TODO: When we are not a proposer, we get the error of being unable to
// unmarshal empty output. Open issue to deal with this.

// If output is non-empty and the addr == coinbase
if err == nil && addr == accounts[0].Address {
log.Info(fmt.Sprintf("Selected as collator on shard: %d", s))
err := proposeCollation(s)
if err != nil {
return fmt.Errorf("could not propose collation. %v", err)
}
}
}

return nil
}

// proposeCollation interacts with the VMC directly to add a collation header
func proposeCollation(shardID int64) error {
// TODO: Adds a collation header to the VMC with the following fields:
// [
// shard_id: uint256,
// expected_period_number: uint256,
// period_start_prevhash: bytes32,
// parent_hash: bytes32,
// transactions_root: bytes32,
// coinbase: address,
// state_root: bytes32,
// receipts_root: bytes32,
// number: uint256,
// sig: bytes
// ]
//
// Before calling this, we would need to have access to the state of
// the period_start_prevhash. Refer to the comments in:
// https://github.com/ethereum/py-evm/issues/258#issuecomment-359879350
//
// This function will call FetchCandidateHead() of the VMC to obtain
// more necessary information.
//
// This functions will fetch the transactions in the txpool and and apply
// them to finish up the collation. It will then need to broadcast the
// collation to the main chain using JSON-RPC.
log.Info("Propose collation function called")
return nil
}
29 changes: 29 additions & 0 deletions sharding/collator_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
package sharding

import (
"context"
"fmt"
"testing"

"github.com/ethereum/go-ethereum/core/types"
)

type FakeClient struct {
client *FakeEthClient
}

type FakeEthClient struct{}

type FakeSubscription struct{}

func (ec *FakeEthClient) SubscribeNewHead(ctx context.Context, ch chan<- *types.Header) (FakeSubscription, error) {
return FakeSubscription{}, fmt.Errorf("error, network disconnected!")
}

func TestSubscribeHeaders(t *testing.T) {
client := &FakeClient{client: &FakeEthClient{}}
err := subscribeBlockHeaders(client)
if err != nil {
t.Errorf("subscribe new headers should work", "no error", err)
}
}
4 changes: 2 additions & 2 deletions sharding/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,13 @@ import (

var (
// Number of network shards
shardCount = 100
shardCount = int64(100)
// Address of the validator management contract
validatorManagerAddress = common.HexToAddress("0x0") // TODO
// Gas limit for verifying signatures
sigGasLimit = 40000
// Number of blocks in a period
periodLength = 5
periodLength = int64(5)
// Number of periods ahead of current period which the contract is able to return the collator of that period.
lookaheadPeriods = 4
// Required deposit size in wei
Expand Down
Loading

0 comments on commit 5b27607

Please sign in to comment.