Skip to content
This repository was archived by the owner on Sep 23, 2024. It is now read-only.

Commit

Permalink
Continue refactor
Browse files Browse the repository at this point in the history
  • Loading branch information
vcastellm committed Dec 18, 2023
1 parent 864abdf commit 0c436fc
Show file tree
Hide file tree
Showing 4 changed files with 60 additions and 94 deletions.
18 changes: 15 additions & 3 deletions cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/0xPolygon/beethoven/config"
"github.com/0xPolygon/beethoven/db"
"github.com/0xPolygon/beethoven/etherman"
"github.com/0xPolygon/beethoven/interop"
"github.com/0xPolygon/beethoven/network"
"github.com/0xPolygon/beethoven/rpc"
)
Expand Down Expand Up @@ -124,6 +125,18 @@ func start(cliCtx *cli.Context) error {
log.Fatal(err)
}

ctx, cancel := context.WithTimeout(context.Background(), i.rpcTimeout)
defer cancel()

executor := interop.New(
log,
c,
addr,
storage,
ethMan,
etm,
)

// Register services
server := jsonrpc.NewServer(
c.RPC,
Expand All @@ -133,9 +146,8 @@ func start(cliCtx *cli.Context) error {
&dummyinterfaces.DummyStorage{},
[]jsonrpc.Service{
{
Name: rpc.INTEROP,
Service: rpc.NewInteropEndpoints(addr, storage, &ethMan,
c.FullNodeRPCs, c.RPC.ReadTimeout.Duration, etm),
Name: rpc.INTEROP,
Service: rpc.NewInteropEndpoints(ctx, executor, storage),
},
},
)
Expand Down
2 changes: 0 additions & 2 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,6 @@ import (
"github.com/mitchellh/mapstructure"
"github.com/spf13/viper"
"github.com/urfave/cli/v2"

"github.com/0xPolygon/beethoven/rpc"
)

const (
Expand Down
24 changes: 24 additions & 0 deletions interop/query.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
package interop

import (
"context"
"fmt"

"github.com/0xPolygon/cdk-validium-node/jsonrpc/types"
"github.com/ethereum/go-ethereum/common"
"github.com/jackc/pgx/v4"
)

func (e *Executor) GetTxStatus(ctx context.Context, hash common.Hash, dbTx pgx.Tx) (result interface{}, err types.Error) {
res, innerErr := e.ethTxMan.Result(ctx, ethTxManOwner, hash.Hex(), dbTx)
if innerErr != nil {
result = "0x0"
err = types.NewRPCError(types.DefaultErrorCode, fmt.Sprintf("failed to get tx, error: %s", innerErr))

return
}

result = res.Status.String()

return
}
110 changes: 21 additions & 89 deletions rpc/rpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,11 @@ package rpc
import (
"context"
"fmt"
"math/big"
"time"

"github.com/0xPolygon/beethoven/config"
"github.com/0xPolygon/beethoven/interop"
"github.com/0xPolygon/cdk-validium-node/jsonrpc/client"
"github.com/0xPolygon/cdk-validium-node/jsonrpc/types"
"github.com/0xPolygon/cdk-validium-node/log"
"github.com/ethereum/go-ethereum"
"github.com/ethereum/go-ethereum/common"

"github.com/0xPolygon/beethoven/tx"
Expand All @@ -33,126 +29,62 @@ func (zc *zkEVMClientCreator) NewClient(rpc string) interop.ZkEVMClientInterface

// InteropEndpoints contains implementations for the "interop" RPC endpoints
type InteropEndpoints struct {
interop *interop.Executor
ctx context.Context
executor *interop.Executor
db interop.DBInterface
etherman interop.EthermanInterface
// interopAdminAddr common.Address
fullNodeRPCs config.FullNodeRPCs
rpcTimeout time.Duration
ethTxManager interop.EthTxManager
zkEVMClientCreator interop.ZkEVMClientClientCreator
}

// NewInteropEndpoints returns InteropEndpoints
func NewInteropEndpoints(
interop *interop.Executor,
// interopAdminAddr common.Address,
ctx context.Context,
executor *interop.Executor,
db interop.DBInterface,
etherman interop.EthermanInterface,
fullNodeRPCs config.FullNodeRPCs,
rpcTimeout time.Duration,
ethTxManager interop.EthTxManager,
) *InteropEndpoints {
return &InteropEndpoints{
db: db,
// interopAdminAddr: interopAdminAddr,
etherman: etherman,
fullNodeRPCs: fullNodeRPCs,
rpcTimeout: rpcTimeout,
ethTxManager: ethTxManager,
zkEVMClientCreator: &zkEVMClientCreator{},
ctx: ctx,
executor: executor,
db: db,
}
}

func (i *InteropEndpoints) SendTx(signedTx tx.SignedTx) (interface{}, types.Error) {
ctx, cancel := context.WithTimeout(context.Background(), i.rpcTimeout)
defer cancel()

// Check if the RPC is actually registered, if not it won't be possible to assert soundness (in the future once we are stateless won't be needed)
if _, ok := i.fullNodeRPCs[signedTx.Tx.L1Contract]; !ok {
if err := i.executor.CheckTx(i.ctx, signedTx); err != nil {
return "0x0", types.NewRPCError(types.DefaultErrorCode, fmt.Sprintf("there is no RPC registered for %s", signedTx.Tx.L1Contract))
}

// Verify ZKP using eth_call
l1TxData, err := i.etherman.BuildTrustedVerifyBatchesTxData(
uint64(signedTx.Tx.LastVerifiedBatch),
uint64(signedTx.Tx.NewVerifiedBatch),
signedTx.Tx.ZKP,
)
if err != nil {
return "0x0", types.NewRPCError(types.DefaultErrorCode, fmt.Sprintf("failed to build verify ZKP tx: %s", err))
}
msg := ethereum.CallMsg{
From: i.interopAdminAddr,
To: &signedTx.Tx.L1Contract,
Data: l1TxData,
}
res, err := i.etherman.CallContract(ctx, msg, nil)
if err != nil {
return "0x0", types.NewRPCError(types.DefaultErrorCode, fmt.Sprintf("failed to call verify ZKP response: %s, error: %s", res, err))
if err := i.executor.Verify(signedTx); err != nil {
return "0x0", types.NewRPCError(types.DefaultErrorCode, fmt.Sprintf("failed to verify tx: %s", err))
}

// Auth: check signature vs admin
signer, err := signedTx.Signer()
if err != nil {
return "0x0", types.NewRPCError(types.DefaultErrorCode, "failed to get signer")
}

sequencer, err := i.etherman.GetSequencerAddr(signedTx.Tx.L1Contract)
if err != nil {
return "0x0", types.NewRPCError(types.DefaultErrorCode, "failed to get admin from L1")
}

if sequencer != signer {
return "0x0", types.NewRPCError(types.DefaultErrorCode, "unexpected signer")
}

// Check expected root vs root from the managed full node
// TODO: go stateless, depends on https://github.com/0xPolygonHermez/zkevm-prover/issues/581
// when this happens we should go async from here, since processing all the batches could take a lot of time
zkEVMClient := i.zkEVMClientCreator.NewClient(i.fullNodeRPCs[signedTx.Tx.L1Contract])
batch, err := zkEVMClient.BatchByNumber(
ctx,
big.NewInt(int64(signedTx.Tx.NewVerifiedBatch)),
)
if err != nil {
return "0x0", types.NewRPCError(types.DefaultErrorCode, fmt.Sprintf("failed to get batch from our node, error: %s", err))
}

if batch.StateRoot != signedTx.Tx.ZKP.NewStateRoot || batch.LocalExitRoot != signedTx.Tx.ZKP.NewLocalExitRoot {
return "0x0", types.NewRPCError(types.DefaultErrorCode, fmt.Sprintf(
"Mismatch detected, expected local exit root: %s actual: %s. expected state root: %s actual: %s",
signedTx.Tx.ZKP.NewLocalExitRoot.Hex(),
batch.LocalExitRoot.Hex(),
signedTx.Tx.ZKP.NewStateRoot.Hex(),
batch.StateRoot.Hex(),
))
if err := i.executor.Execute(signedTx); err != nil {
return "0x0", types.NewRPCError(types.DefaultErrorCode, fmt.Sprintf("failed to execute tx: %s", err))
}

// Send L1 tx
dbTx, err := i.db.BeginStateTransaction(ctx)
dbTx, err := i.db.BeginStateTransaction(i.ctx)
if err != nil {
return "0x0", types.NewRPCError(types.DefaultErrorCode, fmt.Sprintf("failed to begin dbTx, error: %s", err))
}
err = i.ethTxManager.Add(ctx, ethTxManOwner, signedTx.Tx.Hash().Hex(), i.interopAdminAddr, &signedTx.Tx.L1Contract, nil, l1TxData, dbTx)

_, err = i.executor.Settle(signedTx, dbTx)
if err != nil {
if errRollback := dbTx.Rollback(ctx); errRollback != nil {
if errRollback := dbTx.Rollback(i.ctx); errRollback != nil {
log.Error("rollback err: ", errRollback)
}
return "0x0", types.NewRPCError(types.DefaultErrorCode, fmt.Sprintf("failed to add tx to ethTxMan, error: %s", err))
}
if err := dbTx.Commit(ctx); err != nil {
if err := dbTx.Commit(i.ctx); err != nil {
return "0x0", types.NewRPCError(types.DefaultErrorCode, fmt.Sprintf("failed to commit dbTx, error: %s", err))
}
log.Debugf("successfuly added tx %s to ethTxMan", signedTx.Tx.Hash().Hex())

return signedTx.Tx.Hash(), nil
}

func (i *InteropEndpoints) GetTxStatus(hash common.Hash) (result interface{}, err types.Error) {
ctx, cancel := context.WithTimeout(context.Background(), i.rpcTimeout)
defer cancel()

dbTx, innerErr := i.db.BeginStateTransaction(ctx)
dbTx, innerErr := i.db.BeginStateTransaction(i.ctx)
if innerErr != nil {
result = "0x0"
err = types.NewRPCError(types.DefaultErrorCode, fmt.Sprintf("failed to begin dbTx, error: %s", innerErr))
Expand All @@ -161,13 +93,13 @@ func (i *InteropEndpoints) GetTxStatus(hash common.Hash) (result interface{}, er
}

defer func() {
if innerErr := dbTx.Rollback(ctx); innerErr != nil {
if innerErr := dbTx.Rollback(i.ctx); innerErr != nil {
result = "0x0"
err = types.NewRPCError(types.DefaultErrorCode, fmt.Sprintf("failed to rollback dbTx, error: %s", innerErr))
}
}()

res, innerErr := i.ethTxManager.Result(ctx, ethTxManOwner, hash.Hex(), dbTx)
res, innerErr := i.executor.GetTxStatus(i.ctx, hash, dbTx)
if innerErr != nil {
result = "0x0"
err = types.NewRPCError(types.DefaultErrorCode, fmt.Sprintf("failed to get tx, error: %s", innerErr))
Expand Down

0 comments on commit 0c436fc

Please sign in to comment.