diff --git a/op-conductor/conductor/config.go b/op-conductor/conductor/config.go index 2d356a6553e1..ca18de7d1a18 100644 --- a/op-conductor/conductor/config.go +++ b/op-conductor/conductor/config.go @@ -3,6 +3,7 @@ package conductor import ( "fmt" "math" + "time" "github.com/ethereum/go-ethereum/log" "github.com/pkg/errors" @@ -33,6 +34,15 @@ type Config struct { // RaftBootstrap is true if this node should bootstrap a new raft cluster. RaftBootstrap bool + // RaftSnapshotInterval is the interval to check if a snapshot should be taken. + RaftSnapshotInterval time.Duration + + // RaftSnapshotThreshold is the number of logs to trigger a snapshot. + RaftSnapshotThreshold uint64 + + // RaftTrailingLogs is the number of logs to keep after a snapshot. + RaftTrailingLogs uint64 + // NodeRPC is the HTTP provider URL for op-node. NodeRPC string @@ -107,14 +117,17 @@ func NewConfig(ctx *cli.Context, log log.Logger) (*Config, error) { } return &Config{ - ConsensusAddr: ctx.String(flags.ConsensusAddr.Name), - ConsensusPort: ctx.Int(flags.ConsensusPort.Name), - RaftBootstrap: ctx.Bool(flags.RaftBootstrap.Name), - RaftServerID: ctx.String(flags.RaftServerID.Name), - RaftStorageDir: ctx.String(flags.RaftStorageDir.Name), - NodeRPC: ctx.String(flags.NodeRPC.Name), - ExecutionRPC: ctx.String(flags.ExecutionRPC.Name), - Paused: ctx.Bool(flags.Paused.Name), + ConsensusAddr: ctx.String(flags.ConsensusAddr.Name), + ConsensusPort: ctx.Int(flags.ConsensusPort.Name), + RaftBootstrap: ctx.Bool(flags.RaftBootstrap.Name), + RaftServerID: ctx.String(flags.RaftServerID.Name), + RaftStorageDir: ctx.String(flags.RaftStorageDir.Name), + RaftSnapshotInterval: ctx.Duration(flags.RaftSnapshotInterval.Name), + RaftSnapshotThreshold: ctx.Uint64(flags.RaftSnapshotThreshold.Name), + RaftTrailingLogs: ctx.Uint64(flags.RaftTrailingLogs.Name), + NodeRPC: ctx.String(flags.NodeRPC.Name), + ExecutionRPC: ctx.String(flags.ExecutionRPC.Name), + Paused: ctx.Bool(flags.Paused.Name), HealthCheck: HealthCheckConfig{ Interval: ctx.Uint64(flags.HealthCheckInterval.Name), UnsafeInterval: ctx.Uint64(flags.HealthCheckUnsafeInterval.Name), diff --git a/op-conductor/conductor/service.go b/op-conductor/conductor/service.go index 2a39193c6bcc..d2eb4fe89d9d 100644 --- a/op-conductor/conductor/service.go +++ b/op-conductor/conductor/service.go @@ -149,7 +149,17 @@ func (c *OpConductor) initConsensus(ctx context.Context) error { } serverAddr := fmt.Sprintf("%s:%d", c.cfg.ConsensusAddr, c.cfg.ConsensusPort) - cons, err := consensus.NewRaftConsensus(c.log, c.cfg.RaftServerID, serverAddr, c.cfg.RaftStorageDir, c.cfg.RaftBootstrap, &c.cfg.RollupCfg) + raftConsensusConfig := &consensus.RaftConsensusConfig{ + ServerID: c.cfg.RaftServerID, + ServerAddr: serverAddr, + StorageDir: c.cfg.RaftStorageDir, + Bootstrap: c.cfg.RaftBootstrap, + RollupCfg: &c.cfg.RollupCfg, + SnapshotInterval: c.cfg.RaftSnapshotInterval, + SnapshotThreshold: c.cfg.RaftSnapshotThreshold, + TrailingLogs: c.cfg.RaftTrailingLogs, + } + cons, err := consensus.NewRaftConsensus(c.log, raftConsensusConfig) if err != nil { return errors.Wrap(err, "failed to create raft consensus") } diff --git a/op-conductor/consensus/raft.go b/op-conductor/consensus/raft.go index c534d0305847..f6acc0fb76f1 100644 --- a/op-conductor/consensus/raft.go +++ b/op-conductor/consensus/raft.go @@ -32,6 +32,17 @@ type RaftConsensus struct { unsafeTracker *unsafeHeadTracker } +type RaftConsensusConfig struct { + ServerID string + ServerAddr string + StorageDir string + Bootstrap bool + RollupCfg *rollup.Config + SnapshotInterval time.Duration + SnapshotThreshold uint64 + TrailingLogs uint64 +} + // checkTCPPortOpen attempts to connect to the specified address and returns an error if the connection fails. func checkTCPPortOpen(address string) error { conn, err := net.DialTimeout("tcp", address, 5*time.Second) @@ -43,11 +54,14 @@ func checkTCPPortOpen(address string) error { } // NewRaftConsensus creates a new RaftConsensus instance. -func NewRaftConsensus(log log.Logger, serverID, serverAddr, storageDir string, bootstrap bool, rollupCfg *rollup.Config) (*RaftConsensus, error) { +func NewRaftConsensus(log log.Logger, cfg *RaftConsensusConfig) (*RaftConsensus, error) { rc := raft.DefaultConfig() - rc.LocalID = raft.ServerID(serverID) + rc.SnapshotInterval = cfg.SnapshotInterval + rc.TrailingLogs = cfg.TrailingLogs + rc.SnapshotThreshold = cfg.SnapshotThreshold + rc.LocalID = raft.ServerID(cfg.ServerID) - baseDir := filepath.Join(storageDir, serverID) + baseDir := filepath.Join(cfg.StorageDir, cfg.ServerID) if _, err := os.Stat(baseDir); os.IsNotExist(err) { if err := os.MkdirAll(baseDir, 0o755); err != nil { return nil, fmt.Errorf("error creating storage dir: %w", err) @@ -72,7 +86,7 @@ func NewRaftConsensus(log log.Logger, serverID, serverAddr, storageDir string, b return nil, fmt.Errorf(`raft.NewFileSnapshotStore(%q): %w`, baseDir, err) } - addr, err := net.ResolveTCPAddr("tcp", serverAddr) + addr, err := net.ResolveTCPAddr("tcp", cfg.ServerAddr) if err != nil { return nil, errors.Wrap(err, "failed to resolve tcp address") } @@ -95,18 +109,18 @@ func NewRaftConsensus(log log.Logger, serverID, serverAddr, storageDir string, b // If bootstrap = true, start raft in bootstrap mode, this will allow the current node to elect itself as leader when there's no other participants // and allow other nodes to join the cluster. - if bootstrap { - cfg := raft.Configuration{ + if cfg.Bootstrap { + raftCfg := raft.Configuration{ Servers: []raft.Server{ { ID: rc.LocalID, - Address: raft.ServerAddress(serverAddr), + Address: raft.ServerAddress(cfg.ServerAddr), Suffrage: raft.Voter, }, }, } - f := r.BootstrapCluster(cfg) + f := r.BootstrapCluster(raftCfg) if err := f.Error(); err != nil { return nil, errors.Wrap(err, "failed to bootstrap raft cluster") } @@ -115,9 +129,9 @@ func NewRaftConsensus(log log.Logger, serverID, serverAddr, storageDir string, b return &RaftConsensus{ log: log, r: r, - serverID: raft.ServerID(serverID), + serverID: raft.ServerID(cfg.ServerID), unsafeTracker: fsm, - rollupCfg: rollupCfg, + rollupCfg: cfg.RollupCfg, }, nil } diff --git a/op-conductor/consensus/raft_test.go b/op-conductor/consensus/raft_test.go index 332bbd203e7e..fbd9c7cb3bc8 100644 --- a/op-conductor/consensus/raft_test.go +++ b/op-conductor/consensus/raft_test.go @@ -18,9 +18,6 @@ import ( func TestCommitAndRead(t *testing.T) { log := testlog.Logger(t, log.LevelInfo) - serverID := "SequencerA" - serverAddr := "127.0.0.1:0" - bootstrap := true now := uint64(time.Now().Unix()) rollupCfg := &rollup.Config{ CanyonTime: &now, @@ -29,8 +26,18 @@ func TestCommitAndRead(t *testing.T) { if err := os.RemoveAll(storageDir); err != nil { t.Fatal(err) } + raftConsensusConfig := &RaftConsensusConfig{ + ServerID: "SequencerA", + ServerAddr: "127.0.0.1:0", + StorageDir: storageDir, + Bootstrap: true, + RollupCfg: rollupCfg, + SnapshotInterval: 120 * time.Second, + SnapshotThreshold: 10240, + TrailingLogs: 8192, + } - cons, err := NewRaftConsensus(log, serverID, serverAddr, storageDir, bootstrap, rollupCfg) + cons, err := NewRaftConsensus(log, raftConsensusConfig) require.NoError(t, err) // wait till it became leader diff --git a/op-conductor/flags/flags.go b/op-conductor/flags/flags.go index 4870ba4a3ab0..249e8a676e07 100644 --- a/op-conductor/flags/flags.go +++ b/op-conductor/flags/flags.go @@ -2,6 +2,7 @@ package flags import ( "fmt" + "time" "github.com/urfave/cli/v2" @@ -44,6 +45,24 @@ var ( Usage: "Directory to store raft data", EnvVars: opservice.PrefixEnvVar(EnvVarPrefix, "RAFT_STORAGE_DIR"), } + RaftSnapshotInterval = &cli.DurationFlag{ + Name: "raft.snapshot-interval", + Usage: "The interval to check if a snapshot should be taken.", + EnvVars: opservice.PrefixEnvVar(EnvVarPrefix, "RAFT_SNAPSHOT_INTERVAL"), + Value: 120 * time.Second, + } + RaftSnapshotThreshold = &cli.Uint64Flag{ + Name: "raft.snapshot-threshold", + Usage: "Number of logs to trigger a snapshot", + EnvVars: opservice.PrefixEnvVar(EnvVarPrefix, "RAFT_SNAPSHOT_THRESHOLD"), + Value: 8192, + } + RaftTrailingLogs = &cli.Uint64Flag{ + Name: "raft.trailing-logs", + Usage: "Number of logs to keep after a snapshot", + EnvVars: opservice.PrefixEnvVar(EnvVarPrefix, "RAFT_TRAILING_LOGS"), + Value: 10240, + } NodeRPC = &cli.StringFlag{ Name: "node.rpc", Usage: "HTTP provider URL for op-node", @@ -113,6 +132,9 @@ var optionalFlags = []cli.Flag{ RaftBootstrap, HealthCheckSafeEnabled, HealthCheckSafeInterval, + RaftSnapshotInterval, + RaftSnapshotThreshold, + RaftTrailingLogs, } func init() { diff --git a/op-e2e/sequencer_failover_setup.go b/op-e2e/sequencer_failover_setup.go index db4e05bff433..890c6269bdf1 100644 --- a/op-e2e/sequencer_failover_setup.go +++ b/op-e2e/sequencer_failover_setup.go @@ -208,14 +208,17 @@ func setupConductor( ) (*conductor, error) { consensusPort := findAvailablePort(t) cfg := con.Config{ - ConsensusAddr: localhost, - ConsensusPort: consensusPort, - RaftServerID: serverID, - RaftStorageDir: dir, - RaftBootstrap: bootstrap, - NodeRPC: nodeRPC, - ExecutionRPC: engineRPC, - Paused: true, + ConsensusAddr: localhost, + ConsensusPort: consensusPort, + RaftServerID: serverID, + RaftStorageDir: dir, + RaftBootstrap: bootstrap, + RaftSnapshotInterval: 120 * time.Second, + RaftSnapshotThreshold: 8192, + RaftTrailingLogs: 10240, + NodeRPC: nodeRPC, + ExecutionRPC: engineRPC, + Paused: true, HealthCheck: con.HealthCheckConfig{ Interval: 1, // per test setup, l2 block time is 1s. MinPeerCount: 2, // per test setup, each sequencer has 2 peers