Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

event log #1989

Merged
merged 14 commits into from
Apr 5, 2023
Merged
Show file tree
Hide file tree
Changes from 10 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
101 changes: 82 additions & 19 deletions cmd/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@ import (
"github.com/0xPolygonHermez/zkevm-node/db"
"github.com/0xPolygonHermez/zkevm-node/etherman"
"github.com/0xPolygonHermez/zkevm-node/ethtxmanager"
"github.com/0xPolygonHermez/zkevm-node/event"
"github.com/0xPolygonHermez/zkevm-node/event/nileventstorage"
"github.com/0xPolygonHermez/zkevm-node/event/pgeventstorage"
"github.com/0xPolygonHermez/zkevm-node/gasprice"
"github.com/0xPolygonHermez/zkevm-node/jsonrpc"
"github.com/0xPolygonHermez/zkevm-node/jsonrpc/client"
Expand Down Expand Up @@ -72,6 +75,24 @@ func start(cliCtx *cli.Context) error {
}
checkStateMigrations(c.StateDB)

// Prepare event log
var eventLog *event.EventLog
var eventStorage event.Storage

if c.EventLog.DB.Name != "disabled" {
agnusmor marked this conversation as resolved.
Show resolved Hide resolved
eventStorage, err = pgeventstorage.NewPostgresEventStorage(c.EventLog.DB)
if err != nil {
log.Fatal(err)
}
} else {
eventStorage, err = nileventstorage.NewNilEventStorage()
if err != nil {
log.Fatal(err)
}
}

eventLog = event.NewEventLog(c.EventLog, eventStorage)

stateSqlDB, err := db.NewSQLDB(c.StateDB)
if err != nil {
log.Fatal(err)
Expand Down Expand Up @@ -105,7 +126,7 @@ func start(cliCtx *cli.Context) error {
log.Infof("Chain ID read from POE SC = %v", l2ChainID)

ctx := context.Background()
st := newState(ctx, c, l2ChainID, forkIDIntervals, stateSqlDB)
st := newState(ctx, c, l2ChainID, forkIDIntervals, stateSqlDB, eventLog)

ethTxManagerStorage, err := ethtxmanager.NewPostgresStorage(c.StateDB)
if err != nil {
Expand All @@ -114,19 +135,41 @@ func start(cliCtx *cli.Context) error {

etm := ethtxmanager.New(c.EthTxManager, etherman, ethTxManagerStorage, st)

ev := &event.Event{
ReceivedAt: time.Now(),
Source: event.Source_Node,
Level: event.Level_Info,
EventID: event.EventID_NodeComponentStarted,
}

for _, component := range components {
switch component {
case AGGREGATOR:
log.Info("Running aggregator")
ev.Component = event.Component_Aggregator
ev.Description = "Running aggregator"
err := eventLog.LogEvent(ctx, ev)
if err != nil {
log.Fatal(err)
}
go runAggregator(ctx, c.Aggregator, etherman, etm, st)
case SEQUENCER:
log.Info("Running sequencer")
poolInstance := createPool(c.Pool, c.NetworkConfig.L2BridgeAddr, l2ChainID, st)
seq := createSequencer(*c, poolInstance, ethTxManagerStorage, st)
ev.Component = event.Component_Sequencer
ev.Description = "Running sequencer"
err := eventLog.LogEvent(ctx, ev)
if err != nil {
log.Fatal(err)
}
poolInstance := createPool(c.Pool, c.NetworkConfig.L2BridgeAddr, l2ChainID, st, eventLog)
seq := createSequencer(*c, poolInstance, ethTxManagerStorage, st, eventLog)
go seq.Start(ctx)
case RPC:
log.Info("Running JSON-RPC server")
poolInstance := createPool(c.Pool, c.NetworkConfig.L2BridgeAddr, l2ChainID, st)
ev.Component = event.Component_RPC
ev.Description = "Running JSON-RPC server"
err := eventLog.LogEvent(ctx, ev)
if err != nil {
log.Fatal(err)
}
poolInstance := createPool(c.Pool, c.NetworkConfig.L2BridgeAddr, l2ChainID, st, eventLog)
if c.RPC.EnableL2SuggestedGasPricePolling {
// Needed for rejecting transactions with too low gas price
poolInstance.StartPollingMinSuggestedGasPrice(ctx)
Expand All @@ -137,19 +180,39 @@ func start(cliCtx *cli.Context) error {
}
go runJSONRPCServer(*c, poolInstance, st, apis)
case SYNCHRONIZER:
log.Info("Running synchronizer")
poolInstance := createPool(c.Pool, c.NetworkConfig.L2BridgeAddr, l2ChainID, st)
ev.Component = event.Component_Synchronizer
ev.Description = "Running synchronizer"
err := eventLog.LogEvent(ctx, ev)
if err != nil {
log.Fatal(err)
}
poolInstance := createPool(c.Pool, c.NetworkConfig.L2BridgeAddr, l2ChainID, st, eventLog)
go runSynchronizer(*c, etherman, etm, st, poolInstance)
case BROADCAST:
log.Info("Running broadcast service")
ev.Component = event.Component_Broadcast
ev.Description = "Running broadcast service"
err := eventLog.LogEvent(ctx, ev)
if err != nil {
log.Fatal(err)
}
go runBroadcastServer(c.BroadcastServer, st)
case ETHTXMANAGER:
log.Info("Running eth tx manager service")
ev.Component = event.Component_EthTxManager
ev.Description = "Running eth tx manager service"
err := eventLog.LogEvent(ctx, ev)
if err != nil {
log.Fatal(err)
}
etm := createEthTxManager(*c, ethTxManagerStorage, st)
go etm.Start()
case L2GASPRICER:
log.Info("Running L2 gasPricer")
poolInstance := createPool(c.Pool, c.NetworkConfig.L2BridgeAddr, l2ChainID, st)
ev.Component = event.Component_GasPricer
ev.Description = "Running L2 gasPricer"
err := eventLog.LogEvent(ctx, ev)
if err != nil {
log.Fatal(err)
}
poolInstance := createPool(c.Pool, c.NetworkConfig.L2BridgeAddr, l2ChainID, st, eventLog)
go runL2GasPriceSuggester(c.L2GasPriceSuggester, st, poolInstance, etherman)
}
}
Expand Down Expand Up @@ -236,7 +299,7 @@ func runJSONRPCServer(c config.Config, pool *pool.Pool, st *state.State, apis ma
}
}

func createSequencer(cfg config.Config, pool *pool.Pool, etmStorage *ethtxmanager.PostgresStorage, st *state.State) *sequencer.Sequencer {
func createSequencer(cfg config.Config, pool *pool.Pool, etmStorage *ethtxmanager.PostgresStorage, st *state.State, eventLog *event.EventLog) *sequencer.Sequencer {
etherman, err := newEtherman(cfg)
if err != nil {
log.Fatal(err)
Expand All @@ -251,7 +314,7 @@ func createSequencer(cfg config.Config, pool *pool.Pool, etmStorage *ethtxmanage

ethTxManager := ethtxmanager.New(cfg.EthTxManager, etherman, etmStorage, st)

seq, err := sequencer.New(cfg.Sequencer, pool, st, etherman, ethTxManager)
seq, err := sequencer.New(cfg.Sequencer, pool, st, etherman, ethTxManager, eventLog)
if err != nil {
log.Fatal(err)
}
Expand Down Expand Up @@ -302,7 +365,7 @@ func waitSignal(cancelFuncs []context.CancelFunc) {
}
}

func newState(ctx context.Context, c *config.Config, l2ChainID uint64, forkIDIntervals []state.ForkIDInterval, sqlDB *pgxpool.Pool) *state.State {
func newState(ctx context.Context, c *config.Config, l2ChainID uint64, forkIDIntervals []state.ForkIDInterval, sqlDB *pgxpool.Pool, eventLog *event.EventLog) *state.State {
stateDb := state.NewPostgresStorage(sqlDB)
executorClient, _, _ := executor.NewExecutorClient(ctx, c.Executor)
stateDBClient, _, _ := merkletree.NewMTDBServiceClient(ctx, c.MTClient)
Expand All @@ -314,17 +377,17 @@ func newState(ctx context.Context, c *config.Config, l2ChainID uint64, forkIDInt
ForkIDIntervals: forkIDIntervals,
}

st := state.NewState(stateCfg, stateDb, executorClient, stateTree)
st := state.NewState(stateCfg, stateDb, executorClient, stateTree, eventLog)
return st
}

func createPool(cfgPool pool.Config, l2BridgeAddr common.Address, l2ChainID uint64, st *state.State) *pool.Pool {
func createPool(cfgPool pool.Config, l2BridgeAddr common.Address, l2ChainID uint64, st *state.State, eventLog *event.EventLog) *pool.Pool {
runPoolMigrations(cfgPool.DB)
poolStorage, err := pgpoolstorage.NewPostgresPoolStorage(cfgPool.DB)
if err != nil {
log.Fatal(err)
}
poolInstance := pool.NewPool(cfgPool, poolStorage, st, l2BridgeAddr, l2ChainID)
poolInstance := pool.NewPool(cfgPool, poolStorage, st, l2BridgeAddr, l2ChainID, eventLog)
return poolInstance
}

Expand Down
2 changes: 2 additions & 0 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"github.com/0xPolygonHermez/zkevm-node/db"
"github.com/0xPolygonHermez/zkevm-node/etherman"
"github.com/0xPolygonHermez/zkevm-node/ethtxmanager"
"github.com/0xPolygonHermez/zkevm-node/event"
"github.com/0xPolygonHermez/zkevm-node/gasprice"
"github.com/0xPolygonHermez/zkevm-node/jsonrpc"
"github.com/0xPolygonHermez/zkevm-node/log"
Expand Down Expand Up @@ -67,6 +68,7 @@ type Config struct {
MTClient merkletree.Config
StateDB db.Config
Metrics metrics.Config
EventLog event.Config
}

// Default parses the default configuration values.
Expand Down
4 changes: 4 additions & 0 deletions config/default.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,4 +150,8 @@ Port = 61090
Host = "0.0.0.0"
Port = 9091
Enabled = false

[EventLog]
[EventLog.DB]
Name = "disabled"
`
1 change: 0 additions & 1 deletion config/environments/local/local.node.config.toml
Original file line number Diff line number Diff line change
Expand Up @@ -145,4 +145,3 @@ Enabled = false
ProfilingHost = "0.0.0.0"
ProfilingPort = 6060
ProfilingEnabled = false

20 changes: 20 additions & 0 deletions db/migrations/state/0005.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
-- +migrate Up
DROP table state.event;
DROP table state.debug;

-- +migrate Down
CREATE TABLE state.event
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd prefer to use IF NOT EXIST to avoid problems. I know this situation shouldn't happen in theory

(
event_type VARCHAR NOT NULL,
timestamp TIMESTAMP WITH TIME ZONE NOT NULL,
ip VARCHAR,
tx_hash VARCHAR,
payload VARCHAR
);

CREATE TABLE state.debug
(
error_type VARCHAR,
timestamp timestamp,
payload VARCHAR
);
14 changes: 14 additions & 0 deletions db/scripts/init_event_db.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
CREATE TYPE level_t AS ENUM ('emerg', 'alert', 'crit', 'err', 'warning', 'notice', 'info', 'debug');

CREATE TABLE public.event (
id BIGSERIAL PRIMARY KEY,
received_at timestamp WITH TIME ZONE default CURRENT_TIMESTAMP,
ip_address inet,
source varchar(32) not null,
component varchar(32),
level level_t not null,
event_id varchar(32) not null,
description text,
data bytea,
json jsonb
);
9 changes: 9 additions & 0 deletions event/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
package event

import "github.com/0xPolygonHermez/zkevm-node/db"

// Config for event
type Config struct {
// DB is the database configuration
DB db.Config `mapstructure:"DB"`
}
88 changes: 88 additions & 0 deletions event/event.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
package event

import (
"math/big"
"time"
)

// EventID is the ID of the event
type EventID string

// Source is the source of the event
type Source string

// Component is the component that triggered the event
type Component string

// Level is the level of the event
type Level string

const (
// EventID_NodeComponentStarted is triggered when the node starts
EventID_NodeComponentStarted = "NODE COMPONENT STARTED"
// EventID_PreexecutionOOC is triggered when an OOC error is detected during the preexecution
EventID_PreexecutionOOC EventID = "PRE EXECUTION OOC"
// EventID_PreexecutionOOG is triggered when an OOG error is detected during the preexecution
EventID_PreexecutionOOG EventID = "PRE EXECUTION OOG"
// EventID_ExecutorError is triggered when an error is detected during the execution
EventID_ExecutorError EventID = "EXECUTOR ERROR"
// EventID_ReprocessFullBatchOOC is triggered when an OOC error is detected during the reprocessing of a full batch
EventID_ReprocessFullBatchOOC EventID = "REPROCESS FULL BATCH OOC"
// EventID_ExecutorRLPError is triggered when an RLP error is detected during the execution
EventID_ExecutorRLPError EventID = "EXECUTOR RLP ERROR"
// EventID_FinalizerHalt is triggered when the finalizer halts
EventID_FinalizerHalt EventID = "FINALIZER HALT"

// Source_Node is the source of the event
Source_Node Source = "node"

// Component_RPC is the component that triggered the event
Component_RPC Component = "rpc"
// Component_Pool is the component that triggered the event
Component_Pool Component = "pool"
// Component_Sequencer is the component that triggered the event
Component_Sequencer Component = "sequencer"
// Component_Synchronizer is the component that triggered the event
Component_Synchronizer Component = "synchronizer"
// Component_Aggregator is the component that triggered the event
Component_Aggregator Component = "aggregator"
// Component_EthTxManager is the component that triggered the event
Component_EthTxManager Component = "ethtxmanager"
// Component_GasPricer is the component that triggered the event
Component_GasPricer Component = "gaspricer"
// Component_Executor is the component that triggered the event
Component_Executor Component = "executor"
// Component_Broadcast is the component that triggered the event
Component_Broadcast Component = "broadcast"

// Level_Emergency is the most severe level
Level_Emergency Level = "emerg"
// Level_Alert is the second most severe level
Level_Alert Level = "alert"
// Level_Critical is the third most severe level
Level_Critical Level = "crit"
// Level_Error is the fourth most severe level
Level_Error Level = "err"
// Level_Warning is the fifth most severe level
Level_Warning Level = "warning"
// Level_Notice is the sixth most severe level
Level_Notice Level = "notice"
// Level_Info is the seventh most severe level
Level_Info Level = "info"
// Level_Debug is the least severe level
Level_Debug Level = "debug"
)

// Event represents a event that may be investigated
type Event struct {
Id big.Int
ReceivedAt time.Time
IPAddress string
Source Source
Component Component
Level Level
EventID EventID
Description string
Data []byte
Json interface{}
}
Loading