Skip to content

Commit

Permalink
Merge pull request #273 from ethstorage/priv-dashboard-miner
Browse files Browse the repository at this point in the history
dashboard part 0: Add miner and submission status for network dashboard.
  • Loading branch information
ping-ke authored Apr 17, 2024
2 parents 90e09ed + ce6a474 commit c29ac4f
Show file tree
Hide file tree
Showing 5 changed files with 118 additions and 40 deletions.
5 changes: 3 additions & 2 deletions ethstorage/miner/miner.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ package miner
import (
"context"
"fmt"
"github.com/ethereum/go-ethereum/ethdb"
"math/big"
"sync"

Expand Down Expand Up @@ -57,7 +58,7 @@ type Miner struct {
lg log.Logger
}

func New(config *Config, storageMgr *ethstorage.StorageManager, api L1API, prover MiningProver, feed *event.Feed, lg log.Logger) *Miner {
func New(config *Config, db ethdb.Database, storageMgr *ethstorage.StorageManager, api L1API, prover MiningProver, feed *event.Feed, lg log.Logger) *Miner {
chainHeadCh := make(chan eth.L1BlockRef, chainHeadChanSize)
miner := &Miner{
feed: feed,
Expand All @@ -66,7 +67,7 @@ func New(config *Config, storageMgr *ethstorage.StorageManager, api L1API, prove
startCh: make(chan struct{}),
stopCh: make(chan struct{}),
lg: lg,
worker: newWorker(*config, storageMgr, api, chainHeadCh, prover, lg),
worker: newWorker(*config, db, storageMgr, api, chainHeadCh, prover, lg),
}
miner.wg.Add(1)
go miner.update()
Expand Down
4 changes: 3 additions & 1 deletion ethstorage/miner/miner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (

"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/common/hexutil"
"github.com/ethereum/go-ethereum/core/rawdb"
"github.com/ethereum/go-ethereum/event"
es "github.com/ethstorage/go-ethstorage/ethstorage"
"github.com/ethstorage/go-ethstorage/ethstorage/eth"
Expand Down Expand Up @@ -67,7 +68,8 @@ func newMiner(t *testing.T, storageMgr *es.StorageManager, client *eth.PollingCl
zkWorkingDir, _ := filepath.Abs("../prover")
pvr := prover.NewKZGPoseidonProver(zkWorkingDir, defaultConfig.ZKeyFileName, defaultConfig.ZKProverMode, lg)
fd := new(event.Feed)
miner := New(defaultConfig, storageMgr, l1api, &pvr, fd, lg)
db := rawdb.NewMemoryDatabase()
miner := New(defaultConfig, db, storageMgr, l1api, &pvr, fd, lg)
return miner
}

Expand Down
143 changes: 108 additions & 35 deletions ethstorage/miner/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ package miner

import (
"context"
"encoding/json"
"errors"
"fmt"
"math/big"
Expand All @@ -14,6 +15,7 @@ import (

"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/ethdb"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/params"
es "github.com/ethstorage/go-ethstorage/ethstorage"
Expand All @@ -31,11 +33,25 @@ const (
)

var (
minedEventSig = crypto.Keccak256Hash([]byte("MinedBlock(uint256,uint256,uint256,uint256,address,uint256)"))
errCh = make(chan miningError, 10)
errDropped = errors.New("dropped: not enough profit")
minedEventSig = crypto.Keccak256Hash([]byte("MinedBlock(uint256,uint256,uint256,uint256,address,uint256)"))
errCh = make(chan miningError, 10)
errDropped = errors.New("dropped: not enough profit")
SubmissionStatusKey = []byte("SubmissionStatusKey")
MiningStatusKey = []byte("MiningStatusKey")
)

type MiningState struct {
MiningPower uint64 `json:"mining_power"`
SamplingTime uint64 `json:"sampling_time"`
}

type SubmissionState struct {
Succeeded int `json:"succeeded_submission"`
Failed int `json:"failed_submission"`
Dropped int `json:"dropped_submission"`
LastSucceededTime int64 `json:"last_succeeded_time"`
}

type task struct {
miner common.Address
shardIdx uint64
Expand Down Expand Up @@ -84,6 +100,7 @@ type worker struct {
config Config
l1API L1API
prover MiningProver
db ethdb.Database
storageMgr *es.StorageManager

chainHeadCh chan eth.L1BlockRef
Expand All @@ -96,32 +113,55 @@ type worker struct {
resultLock sync.Mutex
resultMap map[uint64]*result // protected by resultLock

miningStates map[uint64]*MiningState
submissionStates map[uint64]*SubmissionState

running int32
wg sync.WaitGroup
lg log.Logger
}

func newWorker(
config Config,
db ethdb.Database,
storageMgr *es.StorageManager,
api L1API,
chainHeadCh chan eth.L1BlockRef,
prover MiningProver,
lg log.Logger,
) *worker {
var submissionStates map[uint64]SubmissionState
if status, _ := db.Get(SubmissionStatusKey); status != nil {
if err := json.Unmarshal(status, &submissionStates); err != nil {
log.Error("Failed to decode submission states", "err", err)
}
}
worker := &worker{
config: config,
l1API: api,
prover: prover,
chainHeadCh: chainHeadCh,
shardTaskMap: make(map[uint64]task),
exitCh: make(chan struct{}),
startCh: make(chan uint64, 1),
resultCh: make(chan struct{}, 1),
resultLock: sync.Mutex{},
resultMap: make(map[uint64]*result),
storageMgr: storageMgr,
lg: lg,
config: config,
l1API: api,
prover: prover,
chainHeadCh: chainHeadCh,
shardTaskMap: make(map[uint64]task),
exitCh: make(chan struct{}),
startCh: make(chan uint64, 1),
resultCh: make(chan struct{}, 1),
miningStates: make(map[uint64]*MiningState),
submissionStates: make(map[uint64]*SubmissionState),
resultLock: sync.Mutex{},
resultMap: make(map[uint64]*result),
storageMgr: storageMgr,
db: db,
lg: lg,
}
for _, shardId := range storageMgr.Shards() {
worker.miningStates[shardId] = &MiningState{MiningPower: 0, SamplingTime: 0}
if submissionStates != nil {
if state, ok := submissionStates[shardId]; ok {
worker.submissionStates[shardId] = &state
continue
}
}
worker.submissionStates[shardId] = &SubmissionState{Succeeded: 0, Failed: 0, Dropped: 0, LastSucceededTime: 0}
}
worker.wg.Add(2)
go worker.newWorkLoop()
Expand Down Expand Up @@ -153,6 +193,31 @@ func (w *worker) close() {
close(ch)
}
}
w.saveStates()
}

func (w *worker) saveStates() {
states, err := json.Marshal(w.submissionStates)
if err != nil {
log.Error("Failed to marshal submission states", "err", err)
return
}
err = w.db.Put(SubmissionStatusKey, states)
if err != nil {
log.Error("Failed to store submission states", "err", err)
return
}

states, err = json.Marshal(w.miningStates)
if err != nil {
log.Error("Failed to marshal mining states", "err", err)
return
}
err = w.db.Put(MiningStatusKey, states)
if err != nil {
log.Error("Failed to store mining states", "err", err)
return
}
}

// newWorkLoop is a standalone goroutine to do the following upon received events:
Expand Down Expand Up @@ -317,10 +382,11 @@ func (w *worker) notifyResultLoop() {
func (w *worker) resultLoop() {
defer w.wg.Done()
var startTime = time.Now().Format("2006-01-02 15:04:05")
var succeeded, dropped int
errorCache := make([]miningError, 0)
ticker := time.NewTicker(1 * time.Minute)
defer ticker.Stop()
saveStatesTicker := time.NewTicker(5 * time.Minute)
defer saveStatesTicker.Stop()
for {
select {
case <-w.resultCh:
Expand All @@ -335,15 +401,19 @@ func (w *worker) resultLoop() {
*result,
w.config,
)
if err != nil {
if err == errDropped {
dropped++
if s, ok := w.submissionStates[result.startShardId]; ok {
if err != nil {
if err == errDropped {
s.Dropped++
} else {
s.Failed++
errorCache = append(errorCache, miningError{result.startShardId, result.blockNumber, err})
w.lg.Error("Failed to submit mined result", "shard", result.startShardId, "block", result.blockNumber, "error", err.Error())
}
} else {
errorCache = append(errorCache, miningError{result.startShardId, result.blockNumber, err})
w.lg.Error("Failed to submit mined result", "shard", result.startShardId, "block", result.blockNumber, "error", err.Error())
s.Succeeded++
s.LastSucceededTime = time.Now().UnixMilli()
}
} else {
succeeded++
}
if txHash != (common.Hash{}) {
// waiting for tx confirmation or timeout
Expand Down Expand Up @@ -371,21 +441,18 @@ func (w *worker) resultLoop() {
// optimistically check next result if exists
w.notifyResultLoop()
case <-ticker.C:
for shardId, s := range w.submissionStates {
log.Info(fmt.Sprintf("Mining stats since %s", startTime), "shard", shardId, "succeeded", s.Succeeded, "failed", s.Failed, "dropped", s.Dropped)
}
if len(errorCache) > 0 {
log.Error(fmt.Sprintf("Mining stats since %s", startTime),
"succeeded", succeeded,
"failed", len(errorCache),
"dropped", dropped,
"lastError", errorCache[len(errorCache)-1],
)
} else {
log.Info(fmt.Sprintf("Mining stats since %s", startTime),
"succeeded", succeeded,
"failed", len(errorCache),
"dropped", dropped,
)
log.Error(fmt.Sprintf("Mining stats since %s", startTime), "lastError", errorCache[len(errorCache)-1])
}
case <-saveStatesTicker.C:
w.saveStates()
case err := <-errCh:
if s, ok := w.submissionStates[err.shardIdx]; ok {
s.Failed++
}
errorCache = append(errorCache, err)
case <-w.exitCh:
w.lg.Warn("Worker is exiting from result loop...")
Expand Down Expand Up @@ -450,6 +517,9 @@ func (w *worker) mineTask(t *taskItem) (bool, error) {
w.lg.Warn("Mining tasks timed out", "shard", t.shardIdx, "block", t.blockNumber,
"noncesTried", fmt.Sprintf("%d(%.1f%%)", nonceTriedTotal, float64(nonceTriedTotal*100)/float64(w.config.NonceLimit)),
)
miningState := w.miningStates[t.shardIdx]
miningState.SamplingTime = uint64(time.Since(startTime).Milliseconds())
miningState.MiningPower = nonceTriedTotal * 10000 / w.config.NonceLimit
}
w.lg.Debug("Mining task timed out", "shard", t.shardIdx, "thread", t.thread, "block", t.blockNumber, "noncesTried", nonce-t.nonceStart)
break
Expand All @@ -459,6 +529,9 @@ func (w *worker) mineTask(t *taskItem) (bool, error) {
if t.thread == 0 {
w.lg.Info("Sampling done with all nonces",
"samplingTime", samplingTime, "shard", t.shardIdx, "block", t.blockNumber)
miningState := w.miningStates[t.shardIdx]
miningState.SamplingTime = uint64(time.Since(startTime).Milliseconds())
miningState.MiningPower = 10000
}
w.lg.Debug("Sampling done with all nonces",
"samplingTime", samplingTime, "shard", t.shardIdx, "block", t.blockNumber, "thread", t.thread, "nonceEnd", nonce)
Expand Down
2 changes: 1 addition & 1 deletion ethstorage/node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -254,7 +254,7 @@ func (n *EsNode) initMiner(ctx context.Context, cfg *Config) error {
cfg.Mining.ZKProverMode,
n.log,
)
n.miner = miner.New(cfg.Mining, n.storageManager, l1api, &pvr, n.feed, n.log)
n.miner = miner.New(cfg.Mining, n.db, n.storageManager, l1api, &pvr, n.feed, n.log)
log.Info("Initialized miner")
return nil
}
Expand Down
4 changes: 3 additions & 1 deletion integration_tests/node_mine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"time"

"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/rawdb"
"github.com/ethereum/go-ethereum/event"
"github.com/ethereum/go-ethereum/rpc"
"github.com/ethstorage/go-ethstorage/cmd/es-utils/utils"
Expand Down Expand Up @@ -75,7 +76,8 @@ func TestMining(t *testing.T) {

l1api := miner.NewL1MiningAPI(pClient, lg)
pvr := prover.NewKZGPoseidonProver(miningConfig.ZKWorkingDir, miningConfig.ZKeyFileName, 2, lg)
mnr := miner.New(miningConfig, storageManager, l1api, &pvr, feed, lg)
db := rawdb.NewMemoryDatabase()
mnr := miner.New(miningConfig, db, storageManager, l1api, &pvr, feed, lg)
lg.Info("Initialized miner")

l1HeadsSub := event.ResubscribeErr(time.Second*10, func(ctx context.Context, err error) (event.Subscription, error) {
Expand Down

0 comments on commit c29ac4f

Please sign in to comment.