Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

dashboard part 0: Add miner and submission status for network dashboard. #273

Merged
merged 7 commits into from
Apr 17, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions ethstorage/miner/miner.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ package miner
import (
"context"
"fmt"
"github.com/ethereum/go-ethereum/ethdb"
"math/big"
"sync"

Expand Down Expand Up @@ -57,7 +58,7 @@ type Miner struct {
lg log.Logger
}

func New(config *Config, storageMgr *ethstorage.StorageManager, api L1API, prover MiningProver, feed *event.Feed, lg log.Logger) *Miner {
func New(config *Config, db ethdb.Database, storageMgr *ethstorage.StorageManager, api L1API, prover MiningProver, feed *event.Feed, lg log.Logger) *Miner {
chainHeadCh := make(chan eth.L1BlockRef, chainHeadChanSize)
miner := &Miner{
feed: feed,
Expand All @@ -66,7 +67,7 @@ func New(config *Config, storageMgr *ethstorage.StorageManager, api L1API, prove
startCh: make(chan struct{}),
stopCh: make(chan struct{}),
lg: lg,
worker: newWorker(*config, storageMgr, api, chainHeadCh, prover, lg),
worker: newWorker(*config, db, storageMgr, api, chainHeadCh, prover, lg),
}
miner.wg.Add(1)
go miner.update()
Expand Down
4 changes: 3 additions & 1 deletion ethstorage/miner/miner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (

"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/common/hexutil"
"github.com/ethereum/go-ethereum/core/rawdb"
"github.com/ethereum/go-ethereum/event"
es "github.com/ethstorage/go-ethstorage/ethstorage"
"github.com/ethstorage/go-ethstorage/ethstorage/eth"
Expand Down Expand Up @@ -67,7 +68,8 @@ func newMiner(t *testing.T, storageMgr *es.StorageManager, client *eth.PollingCl
zkWorkingDir, _ := filepath.Abs("../prover")
pvr := prover.NewKZGPoseidonProver(zkWorkingDir, defaultConfig.ZKeyFileName, defaultConfig.ZKProverMode, lg)
fd := new(event.Feed)
miner := New(defaultConfig, storageMgr, l1api, &pvr, fd, lg)
db := rawdb.NewMemoryDatabase()
miner := New(defaultConfig, db, storageMgr, l1api, &pvr, fd, lg)
return miner
}

Expand Down
143 changes: 108 additions & 35 deletions ethstorage/miner/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ package miner

import (
"context"
"encoding/json"
"errors"
"fmt"
"math/big"
Expand All @@ -14,6 +15,7 @@ import (

"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/ethdb"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/params"
es "github.com/ethstorage/go-ethstorage/ethstorage"
Expand All @@ -31,11 +33,25 @@ const (
)

var (
minedEventSig = crypto.Keccak256Hash([]byte("MinedBlock(uint256,uint256,uint256,uint256,address,uint256)"))
errCh = make(chan miningError, 10)
errDropped = errors.New("dropped: not enough profit")
minedEventSig = crypto.Keccak256Hash([]byte("MinedBlock(uint256,uint256,uint256,uint256,address,uint256)"))
errCh = make(chan miningError, 10)
errDropped = errors.New("dropped: not enough profit")
SubmissionStatusKey = []byte("SubmissionStatusKey")
MiningStatusKey = []byte("MiningStatusKey")
)

type MiningState struct {
MiningPower uint64 `json:"mining_power"`
SamplingTime uint64 `json:"sampling_time"`
}

type SubmissionState struct {
Succeeded int `json:"succeeded_submission"`
Failed int `json:"failed_submission"`
Dropped int `json:"dropped_submission"`
LastSucceededTime int64 `json:"last_succeeded_time"`
}

type task struct {
miner common.Address
shardIdx uint64
Expand Down Expand Up @@ -84,6 +100,7 @@ type worker struct {
config Config
l1API L1API
prover MiningProver
db ethdb.Database
storageMgr *es.StorageManager

chainHeadCh chan eth.L1BlockRef
Expand All @@ -96,32 +113,55 @@ type worker struct {
resultLock sync.Mutex
resultMap map[uint64]*result // protected by resultLock

miningStates map[uint64]*MiningState
submissionStates map[uint64]*SubmissionState

running int32
wg sync.WaitGroup
lg log.Logger
}

func newWorker(
config Config,
db ethdb.Database,
storageMgr *es.StorageManager,
api L1API,
chainHeadCh chan eth.L1BlockRef,
prover MiningProver,
lg log.Logger,
) *worker {
var submissionStates map[uint64]SubmissionState
if status, _ := db.Get(SubmissionStatusKey); status != nil {
if err := json.Unmarshal(status, &submissionStates); err != nil {
log.Error("Failed to decode submission states", "err", err)
}
}
worker := &worker{
config: config,
l1API: api,
prover: prover,
chainHeadCh: chainHeadCh,
shardTaskMap: make(map[uint64]task),
exitCh: make(chan struct{}),
startCh: make(chan uint64, 1),
resultCh: make(chan struct{}, 1),
resultLock: sync.Mutex{},
resultMap: make(map[uint64]*result),
storageMgr: storageMgr,
lg: lg,
config: config,
l1API: api,
prover: prover,
chainHeadCh: chainHeadCh,
shardTaskMap: make(map[uint64]task),
exitCh: make(chan struct{}),
startCh: make(chan uint64, 1),
resultCh: make(chan struct{}, 1),
miningStates: make(map[uint64]*MiningState),
submissionStates: make(map[uint64]*SubmissionState),
resultLock: sync.Mutex{},
resultMap: make(map[uint64]*result),
storageMgr: storageMgr,
db: db,
lg: lg,
}
for _, shardId := range storageMgr.Shards() {
worker.miningStates[shardId] = &MiningState{MiningPower: 0, SamplingTime: 0}
if submissionStates != nil {
if state, ok := submissionStates[shardId]; ok {
worker.submissionStates[shardId] = &state
continue
}
}
worker.submissionStates[shardId] = &SubmissionState{Succeeded: 0, Failed: 0, Dropped: 0, LastSucceededTime: 0}
}
worker.wg.Add(2)
go worker.newWorkLoop()
Expand Down Expand Up @@ -153,6 +193,31 @@ func (w *worker) close() {
close(ch)
}
}
w.saveStates()
}

func (w *worker) saveStates() {
states, err := json.Marshal(w.submissionStates)
if err != nil {
log.Error("Failed to marshal submission states", "err", err)
return
}
err = w.db.Put(SubmissionStatusKey, states)
if err != nil {
log.Error("Failed to store submission states", "err", err)
return
}

states, err = json.Marshal(w.miningStates)
if err != nil {
log.Error("Failed to marshal mining states", "err", err)
return
}
err = w.db.Put(MiningStatusKey, states)
if err != nil {
log.Error("Failed to store mining states", "err", err)
return
}
}

// newWorkLoop is a standalone goroutine to do the following upon received events:
Expand Down Expand Up @@ -317,10 +382,11 @@ func (w *worker) notifyResultLoop() {
func (w *worker) resultLoop() {
defer w.wg.Done()
var startTime = time.Now().Format("2006-01-02 15:04:05")
var succeeded, dropped int
errorCache := make([]miningError, 0)
ticker := time.NewTicker(1 * time.Minute)
defer ticker.Stop()
saveStatesTicker := time.NewTicker(5 * time.Minute)
defer saveStatesTicker.Stop()
for {
select {
case <-w.resultCh:
Expand All @@ -335,15 +401,19 @@ func (w *worker) resultLoop() {
*result,
w.config,
)
if err != nil {
if err == errDropped {
dropped++
if s, ok := w.submissionStates[result.startShardId]; ok {
if err != nil {
if err == errDropped {
s.Dropped++
} else {
s.Failed++
errorCache = append(errorCache, miningError{result.startShardId, result.blockNumber, err})
w.lg.Error("Failed to submit mined result", "shard", result.startShardId, "block", result.blockNumber, "error", err.Error())
}
} else {
errorCache = append(errorCache, miningError{result.startShardId, result.blockNumber, err})
w.lg.Error("Failed to submit mined result", "shard", result.startShardId, "block", result.blockNumber, "error", err.Error())
s.Succeeded++
s.LastSucceededTime = time.Now().UnixMilli()
}
} else {
succeeded++
}
if txHash != (common.Hash{}) {
// waiting for tx confirmation or timeout
Expand Down Expand Up @@ -371,21 +441,18 @@ func (w *worker) resultLoop() {
// optimistically check next result if exists
w.notifyResultLoop()
case <-ticker.C:
for shardId, s := range w.submissionStates {
log.Info(fmt.Sprintf("Mining stats since %s", startTime), "shard", shardId, "succeeded", s.Succeeded, "failed", s.Failed, "dropped", s.Dropped)
}
if len(errorCache) > 0 {
log.Error(fmt.Sprintf("Mining stats since %s", startTime),
"succeeded", succeeded,
"failed", len(errorCache),
"dropped", dropped,
"lastError", errorCache[len(errorCache)-1],
)
} else {
log.Info(fmt.Sprintf("Mining stats since %s", startTime),
"succeeded", succeeded,
"failed", len(errorCache),
"dropped", dropped,
)
log.Error(fmt.Sprintf("Mining stats since %s", startTime), "lastError", errorCache[len(errorCache)-1])
}
case <-saveStatesTicker.C:
w.saveStates()
ping-ke marked this conversation as resolved.
Show resolved Hide resolved
case err := <-errCh:
if s, ok := w.submissionStates[err.shardIdx]; ok {
s.Failed++
}
errorCache = append(errorCache, err)
case <-w.exitCh:
w.lg.Warn("Worker is exiting from result loop...")
Expand Down Expand Up @@ -450,6 +517,9 @@ func (w *worker) mineTask(t *taskItem) (bool, error) {
w.lg.Warn("Mining tasks timed out", "shard", t.shardIdx, "block", t.blockNumber,
"noncesTried", fmt.Sprintf("%d(%.1f%%)", nonceTriedTotal, float64(nonceTriedTotal*100)/float64(w.config.NonceLimit)),
)
miningState := w.miningStates[t.shardIdx]
miningState.SamplingTime = uint64(time.Since(startTime).Milliseconds())
miningState.MiningPower = nonceTriedTotal * 10000 / w.config.NonceLimit
}
w.lg.Debug("Mining task timed out", "shard", t.shardIdx, "thread", t.thread, "block", t.blockNumber, "noncesTried", nonce-t.nonceStart)
break
Expand All @@ -459,6 +529,9 @@ func (w *worker) mineTask(t *taskItem) (bool, error) {
if t.thread == 0 {
w.lg.Info("Sampling done with all nonces",
"samplingTime", samplingTime, "shard", t.shardIdx, "block", t.blockNumber)
miningState := w.miningStates[t.shardIdx]
miningState.SamplingTime = uint64(time.Since(startTime).Milliseconds())
miningState.MiningPower = 10000
}
w.lg.Debug("Sampling done with all nonces",
"samplingTime", samplingTime, "shard", t.shardIdx, "block", t.blockNumber, "thread", t.thread, "nonceEnd", nonce)
Expand Down
2 changes: 1 addition & 1 deletion ethstorage/node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -254,7 +254,7 @@ func (n *EsNode) initMiner(ctx context.Context, cfg *Config) error {
cfg.Mining.ZKProverMode,
n.log,
)
n.miner = miner.New(cfg.Mining, n.storageManager, l1api, &pvr, n.feed, n.log)
n.miner = miner.New(cfg.Mining, n.db, n.storageManager, l1api, &pvr, n.feed, n.log)
log.Info("Initialized miner")
return nil
}
Expand Down
4 changes: 3 additions & 1 deletion integration_tests/node_mine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"time"

"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/rawdb"
"github.com/ethereum/go-ethereum/event"
"github.com/ethereum/go-ethereum/rpc"
"github.com/ethstorage/go-ethstorage/cmd/es-utils/utils"
Expand Down Expand Up @@ -75,7 +76,8 @@ func TestMining(t *testing.T) {

l1api := miner.NewL1MiningAPI(pClient, lg)
pvr := prover.NewKZGPoseidonProver(miningConfig.ZKWorkingDir, miningConfig.ZKeyFileName, 2, lg)
mnr := miner.New(miningConfig, storageManager, l1api, &pvr, feed, lg)
db := rawdb.NewMemoryDatabase()
mnr := miner.New(miningConfig, db, storageManager, l1api, &pvr, feed, lg)
lg.Info("Initialized miner")

l1HeadsSub := event.ResubscribeErr(time.Second*10, func(ctx context.Context, err error) (event.Subscription, error) {
Expand Down
Loading