Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Mining multiple shards support #187

Merged
merged 52 commits into from
Sep 9, 2024
Merged
Show file tree
Hide file tree
Changes from 51 commits
Commits
Show all changes
52 commits
Select commit Hold shift + click to select a range
8fbb779
test 2 shards
syntrust Jan 12, 2024
02f3bbc
merge
syntrust Jan 12, 2024
881fd78
merge
syntrust Jan 12, 2024
e46f6e5
start miner early
syntrust Jan 15, 2024
f706d4d
use unique key
syntrust Jan 17, 2024
c5d5a9c
merge
syntrust Jan 19, 2024
4297995
Merge branch 'main' of https://github.com/ethstorage/es-node into 2sh…
syntrust Jan 19, 2024
f76b3f4
remove useless code
syntrust Jan 22, 2024
7e3c34d
Merge branch 'rapidsnark' of https://github.com/ethstorage/es-node in…
syntrust Jan 22, 2024
c8b8888
mock mining process
syntrust Jan 23, 2024
dc75d58
merge
syntrust May 6, 2024
a2ac36c
use lastMineTime to check shard exist
syntrust May 7, 2024
5e8b966
support multiple shards
syntrust May 9, 2024
798e957
skip existed file
syntrust May 9, 2024
722b0d8
Merge branch 'rapidsnark' of https://github.com/ethstorage/es-node in…
syntrust May 9, 2024
9931c30
merge
syntrust May 9, 2024
f6d6242
Merge branch 'rapidsnark-test' of https://github.com/ethstorage/es-no…
syntrust May 9, 2024
c3f2267
updates
syntrust May 10, 2024
320e115
add comments
syntrust May 14, 2024
bf8c2bc
dynamic blob fee
syntrust May 15, 2024
9669615
fix blob price
syntrust May 15, 2024
4c169c3
fix blob price
syntrust May 15, 2024
63cfc94
fixes
syntrust May 15, 2024
6556bba
fix
syntrust May 15, 2024
d8ffc96
fix
syntrust May 16, 2024
b6eede9
remove useless
syntrust May 21, 2024
83f8513
fix test
syntrust May 21, 2024
55cc95c
Merge branch 'rapidsnark-test' of https://github.com/ethstorage/es-no…
syntrust Jun 20, 2024
b2367d0
Merge branch 'rapidsnark-test' of https://github.com/ethstorage/es-no…
syntrust Jun 20, 2024
55976ce
Merge branch 'rapidsnark-test' of https://github.com/ethstorage/es-no…
syntrust Jun 20, 2024
761017d
Merge branch 'rapidsnark-test' of https://github.com/ethstorage/es-no…
syntrust Jun 25, 2024
ba1a26a
Merge branch 'rapidsnark-test' of https://github.com/ethstorage/es-no…
syntrust Jun 28, 2024
1cf1427
Merge branch 'main' of https://github.com/ethstorage/es-node into 2sh…
syntrust Aug 2, 2024
d5b678e
merge
syntrust Aug 2, 2024
360566a
fix comments - add shard ingore status in contract
syntrust Aug 2, 2024
a00efd1
update init
syntrust Aug 2, 2024
b5f1eff
handle --storage.files
syntrust Aug 6, 2024
dd45aea
Merge branch 'run-opt1' of https://github.com/ethstorage/es-node into…
syntrust Aug 7, 2024
eecf5f5
minor
syntrust Aug 7, 2024
f16e9f5
merge
syntrust Aug 13, 2024
757a6d3
fix mining test
syntrust Aug 14, 2024
7edb102
Merge branch 'main' of https://github.com/ethstorage/es-node into 2sh…
syntrust Aug 14, 2024
14bcb0f
print history err
syntrust Aug 15, 2024
4b8c608
use eth_blobBaseFee
syntrust Aug 15, 2024
a20627d
fix test
syntrust Aug 15, 2024
46ad0c8
use upfrontPaymentInBatch
syntrust Aug 19, 2024
462f069
fix merge err
syntrust Aug 19, 2024
4b3cbed
minor
syntrust Aug 21, 2024
e320111
useful log
syntrust Aug 26, 2024
9063bd6
Merge branch 'main' of https://github.com/ethstorage/es-node into 2sh…
syntrust Aug 29, 2024
1790121
support not mining shard0
syntrust Aug 30, 2024
2da0c0d
fix comments
syntrust Sep 3, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 8 additions & 13 deletions cmd/es-node/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ package main
import (
"context"
"fmt"
"math/big"
"net"
"os"
"os/signal"
Expand Down Expand Up @@ -231,20 +230,16 @@ func EsNodeInit(ctx *cli.Context) error {
log.Info("Storage config loaded", "storageCfg", storageCfg)
var shardIdxList []uint64
if len(shardIndexes) > 0 {
// check existense of shard indexes but add shard 0 anyway
out:
for i := 0; i < len(shardIndexes); i++ {
shard := uint64(shardIndexes[i])
if shard > 0 {
diff, err := getDifficulty(cctx, client, l1Contract, shard)
if err != nil {
log.Error("Failed to get shard info from contract", "error", err)
return err
}
if diff != nil && diff.Cmp(big.NewInt(0)) == 0 {
return fmt.Errorf("Shard not exist: %d", shard)
new := uint64(shardIndexes[i])
// prevent duplicated
for _, s := range shardIdxList {
syntrust marked this conversation as resolved.
Show resolved Hide resolved
if s == new {
continue out
}
}
shardIdxList = append(shardIdxList, shard)
shardIdxList = append(shardIdxList, new)
}
} else {
// get shard indexes of length shardLen from contract
Expand All @@ -254,7 +249,7 @@ func EsNodeInit(ctx *cli.Context) error {
return err
}
if len(shardList) == 0 {
return fmt.Errorf("No shard indexes found")
return fmt.Errorf("no shard indexes found")
}
shardIdxList = shardList
}
Expand Down
16 changes: 12 additions & 4 deletions cmd/es-node/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,14 @@ func getShardList(ctx context.Context, client *ethclient.Client, contract common
}

func getDifficulty(ctx context.Context, client *ethclient.Client, contract common.Address, shardIdx uint64) (*big.Int, error) {
res, err := getMiningInfo(ctx, client, contract, shardIdx)
if err != nil {
return nil, err
}
return res[1].(*big.Int), nil
}

func getMiningInfo(ctx context.Context, client *ethclient.Client, contract common.Address, shardIdx uint64) ([]interface{}, error) {
uint256Type, _ := abi.NewType("uint256", "", nil)
dataField, _ := abi.Arguments{{Type: uint256Type}}.Pack(new(big.Int).SetUint64(shardIdx))
h := crypto.Keccak256Hash([]byte(`infos(uint256)`))
Expand All @@ -136,10 +144,10 @@ func getDifficulty(ctx context.Context, client *ethclient.Client, contract commo
{Type: uint256Type},
}.UnpackValues(bs)
if res == nil || len(res) < 3 {
log.Error("Query difficulty by shard", "error", "invalid result", "result", res)
log.Error("Query mining info by shard", "error", "invalid result", "result", res)
return nil, fmt.Errorf("invalid result: %v", res)
}
return res[1].(*big.Int), nil
return res, nil
}

func createDataFile(cfg *storage.StorageConfig, shardIdxList []uint64, datadir string, encodingType int) ([]string, error) {
Expand All @@ -154,8 +162,8 @@ func createDataFile(cfg *storage.StorageConfig, shardIdxList []uint64, datadir s
for _, shardIdx := range shardIdxList {
dataFile := filepath.Join(datadir, fmt.Sprintf(fileName, shardIdx))
if _, err := os.Stat(dataFile); err == nil {
log.Error("Creating data file", "error", "file already exists, will not overwrite", "file", dataFile)
return nil, err
log.Warn("Creating data file", "error", "file already exists, will not overwrite", "file", dataFile)
continue
}
if cfg.ChunkSize == 0 {
return nil, fmt.Errorf("chunk size should not be 0")
Expand Down
61 changes: 52 additions & 9 deletions cmd/es-utils/utils/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,9 +126,24 @@ func SendBlobTx(
}
}

maxFeePerDataGas256, err := DecodeUint256String(maxFeePerDataGas)
if err != nil {
log.Crit("Invalid max_fee_per_data_gas", "error", err)
var blobPrice *uint256.Int
if maxFeePerDataGas != "" {
maxFeePerDataGas256, err := DecodeUint256String(maxFeePerDataGas)
if err != nil {
log.Crit("Invalid max_fee_per_data_gas", "error", err)
}
blobPrice = maxFeePerDataGas256
} else {
blobBaseFee, err := queryBlobBaseFee(client)
if err != nil {
log.Crit("Error getting blob base fee", "error", err)
}
log.Info("Query blob base fee done", "blobBaseFee", blobBaseFee)
blobBaseFee256, nok := uint256.FromBig(blobBaseFee)
if nok {
log.Crit("Error converting blob base fee to uint256", "blobBaseFee", blobBaseFee)
}
blobPrice = blobBaseFee256
}
var blobs []kzg4844.Blob
if needEncoding {
Expand Down Expand Up @@ -159,7 +174,7 @@ func SendBlobTx(
To: to,
Value: value256,
Data: calldataBytes,
BlobFeeCap: maxFeePerDataGas256,
BlobFeeCap: blobPrice,
BlobHashes: versionedHashes,
Sidecar: sideCar,
}
Expand Down Expand Up @@ -300,6 +315,8 @@ func UploadBlobs(
}
signer := crypto.PubkeyToAddress(key.PublicKey)
var keys []common.Hash
var blobIndex []*big.Int
var lengthes []*big.Int

var blobs []kzg4844.Blob
if needEncoding {
Expand All @@ -309,10 +326,23 @@ func UploadBlobs(
}
for i, blob := range blobs {
keys = append(keys, genKey(signer, i, blob[:]))
blobIndex = append(blobIndex, new(big.Int).SetUint64(uint64(i)))
lengthes = append(lengthes, new(big.Int).SetUint64(BlobSize))
}
log.Info("blobs", "keys", keys, "blobIndexes", blobIndex, "sizes", lengthes)
bytes32Array, _ := abi.NewType("bytes32[]", "", nil)
dataField, _ := abi.Arguments{{Type: bytes32Array}}.Pack(keys)
h := crypto.Keccak256Hash([]byte("putBlobs(bytes32[])"))
uint256Array, _ := abi.NewType("uint256[]", "", nil)
args := abi.Arguments{
{Type: bytes32Array},
{Type: uint256Array},
{Type: uint256Array},
}
dataField, err := args.Pack(keys, blobIndex, lengthes)
if err != nil {
log.Error("Failed to pack data", "err", err)
return nil, nil, err
}
h := crypto.Keccak256Hash([]byte("putBlobs(bytes32[],uint256[],uint256[])"))
calldata := "0x" + common.Bytes2Hex(append(h[0:4], dataField...))
tx := SendBlobTx(
rpc,
Expand All @@ -325,7 +355,7 @@ func UploadBlobs(
5000000,
"",
"",
"300000000",
"",
chainID,
calldata,
)
Expand Down Expand Up @@ -371,10 +401,23 @@ func UploadBlobs(
log.Info("Timed out for receipt, query contract for data hash...")
}
// if wait receipt timed out or failed, query contract for data hash
return getKvInfo(pc, contractAddr, len(blobs))
return getKvInfo(pc, len(blobs))
syntrust marked this conversation as resolved.
Show resolved Hide resolved
}

func queryBlobBaseFee(l1 *ethclient.Client) (*big.Int, error) {
var hex string
err := l1.Client().CallContext(context.Background(), &hex, "eth_blobBaseFee")
if err != nil {
return nil, err
}
blobBaseFee, ok := new(big.Int).SetString(hex, 0)
if !ok {
return nil, errors.New("invalid blob base fee")
}
return blobBaseFee, nil
}

func getKvInfo(pc *eth.PollingClient, contractAddr common.Address, blobLen int) ([]uint64, []common.Hash, error) {
func getKvInfo(pc *eth.PollingClient, blobLen int) ([]uint64, []common.Hash, error) {
lastIdx, err := pc.GetStorageLastBlobIdx(rpc.LatestBlockNumber.Int64())
if err != nil {
return nil, nil, err
Expand Down
11 changes: 7 additions & 4 deletions ethstorage/miner/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -355,7 +355,7 @@ func (w *worker) taskLoop(taskCh chan *taskItem) {
w.lg.Info("Mine task success", "shard", ti.shardIdx, "thread", ti.thread, "block", ti.blockNumber)
}
case <-w.exitCh:
w.lg.Warn("Worker is exiting from task loop...")
w.lg.Debug("Worker is exiting from task loop...")
return
}
}
Expand Down Expand Up @@ -459,6 +459,9 @@ func (w *worker) resultLoop() {
errorCache = append(errorCache, err)
case <-w.exitCh:
w.lg.Warn("Worker is exiting from result loop...")
for _, e := range errorCache {
w.lg.Error("Mining error since es-node launched", "err", e)
}
return
}
}
Expand Down Expand Up @@ -548,19 +551,19 @@ func (w *worker) mineTask(t *taskItem) (bool, error) {
return false, err
}
if t.requiredDiff.Cmp(new(big.Int).SetBytes(hash1.Bytes())) >= 0 {
w.lg.Info("Calculated a valid hash", "shard", t.shardIdx, "thread", t.thread, "block", t.blockNumber, "nonce", nonce)
w.lg.Info("Calculated a valid hash", "shard", t.shardIdx, "block", t.blockNumber, "timestamp", t.mineTime, "randao", t.mixHash, "nonce", nonce, "hash0", hash0, "hash1", hash1, "sampleIdxs", sampleIdxs)
dataSet, kvIdxs, sampleIdxsInKv, encodingKeys, encodedSamples, err := w.getMiningData(t.task, sampleIdxs)
if err != nil {
w.lg.Error("Get sample data failed", "kvIdxs", kvIdxs, "sampleIdxsInKv", sampleIdxsInKv, "err", err.Error())
return false, err
}
w.lg.Info("Got sample data", "shard", t.shardIdx, "thread", t.thread, "block", t.blockNumber, "kvIdxs", kvIdxs, "sampleIdxsInKv", sampleIdxsInKv)
w.lg.Info("Got sample data", "shard", t.shardIdx, "block", t.blockNumber, "encodedSamples", encodedSamples)
masks, decodeProof, inclusiveProofs, err := w.prover.GetStorageProof(dataSet, encodingKeys, sampleIdxsInKv)
if err != nil {
w.lg.Error("Get storage proof error", "kvIdx", kvIdxs, "sampleIdxsInKv", sampleIdxsInKv, "error", err.Error())
return false, fmt.Errorf("get proof err: %v", err)
}
w.lg.Info("Got storage proof", "shard", t.shardIdx, "thread", t.thread, "block", t.blockNumber, "kvIdx", kvIdxs, "sampleIdxsInKv", sampleIdxsInKv)
w.lg.Info("Got storage proof", "shard", t.shardIdx, "block", t.blockNumber, "kvIdx", kvIdxs, "sampleIdxsInKv", sampleIdxsInKv)
newResult := &result{
blockNumber: t.blockNumber,
startShardId: t.shardIdx,
Expand Down
4 changes: 2 additions & 2 deletions ethstorage/storage_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,7 @@ func (s *StorageManager) CommitBlobs(kvIndices []uint64, blobs [][]byte, commits
for i := 0; i < len(kvIndices); i++ {
encodedBlob, success, err := s.shardManager.TryEncodeKV(kvIndices[i], blobs[i], commits[i])
if !success || err != nil {
log.Warn("Blob encode failed", "index", kvIndices[i], "err", err.Error())
log.Warn("Blob encode failed", "index", kvIndices[i], "err", err)
continue
}
encodedBlobs[i] = encodedBlob
Expand Down Expand Up @@ -230,7 +230,7 @@ func (s *StorageManager) CommitEmptyBlobs(start, limit uint64) (uint64, uint64,
for i := start; i <= limit; i++ {
encodedBlob, success, err := s.shardManager.TryEncodeKV(i, emptyBs, hash)
if !success || err != nil {
log.Warn("Blob encode failed", "index", i, "err", err.Error())
log.Warn("Blob encode failed", "index", i, "err", err)
break
}
encodedBlobs = append(encodedBlobs, encodedBlob)
Expand Down
2 changes: 1 addition & 1 deletion init-l2.sh
Original file line number Diff line number Diff line change
Expand Up @@ -7,4 +7,4 @@
./init.sh \
--l1.rpc http://65.109.20.29:8545 \
--storage.l1contract 0x64003adbdf3014f7E38FC6BE752EB047b95da89A \
$@
$@
23 changes: 11 additions & 12 deletions init.sh
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ zkp_impl=1
zkp_mode=2

remaining_args=""
shards="--shard_index 0"

while [ $# -gt 0 ]; do
if [[ $1 == --miner.zk-prover-impl ]]; then
Expand All @@ -24,6 +25,9 @@ while [ $# -gt 0 ]; do
zkp_mode=$2
shift 2
else
if [[ $1 == --shard_index ]]; then
shards=""
fi
remaining_args="$remaining_args $1"
shift
fi
Expand Down Expand Up @@ -100,22 +104,17 @@ if [ "$zkp_impl" = 1 ]; then
fi

data_dir="./es-data"
storage_file_0="$data_dir/shard-0.dat"

es_node_init="$executable init --shard_index 0 \
es_node_init="$executable init $shards \
--datadir $data_dir \
--l1.rpc http://88.99.30.186:8545 \
--storage.l1contract 0x804C520d3c084C805E37A35E90057Ac32831F96f \
$remaining_args"

# create data file for shard 0 if not yet
if [ ! -e $storage_file_0 ]; then
if $es_node_init ; then
echo "√ Initialized ${storage_file_0} successfully"
else
echo "Error: failed to initialize ${storage_file_0}"
exit 1
fi
else
echo "Warning: storage file ${storage_file_0} already exists, skip initialization."
# es-node will skip init if data files already exist
if $es_node_init ; then
echo "√ Initialized data files successfully."
else
echo "Error: failed to initialize data files."
exit 1
fi
3 changes: 3 additions & 0 deletions integration_tests/gen_blob.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,3 +47,6 @@ func generateRandomContent(sizeInKB int) []byte {
}
return []byte(content)
}
func generateRandomBlobs(blobLen int) []byte {
return generateRandomContent(128 * 31 / 32 * blobLen)
}
Loading
Loading