Skip to content

Commit

Permalink
op-conductor: adds raft log snapshot configs
Browse files Browse the repository at this point in the history
* Refactors NewRaftConsensus param list into a config struct
* Adds configuration support for SnapshotInterval, SnapshotThreshold, TrailingLogs
  • Loading branch information
zhwrd committed Sep 4, 2024
1 parent 375b976 commit 75bd25a
Show file tree
Hide file tree
Showing 6 changed files with 100 additions and 31 deletions.
29 changes: 21 additions & 8 deletions op-conductor/conductor/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package conductor
import (
"fmt"
"math"
"time"

"github.com/ethereum/go-ethereum/log"
"github.com/pkg/errors"
Expand Down Expand Up @@ -33,6 +34,15 @@ type Config struct {
// RaftBootstrap is true if this node should bootstrap a new raft cluster.
RaftBootstrap bool

// RaftSnapshotInterval is the interval to check if a snapshot should be taken.
RaftSnapshotInterval time.Duration

// RaftSnapshotThreshold is the number of logs to trigger a snapshot.
RaftSnapshotThreshold uint64

// RaftTrailingLogs is the number of logs to keep after a snapshot.
RaftTrailingLogs uint64

// NodeRPC is the HTTP provider URL for op-node.
NodeRPC string

Expand Down Expand Up @@ -107,14 +117,17 @@ func NewConfig(ctx *cli.Context, log log.Logger) (*Config, error) {
}

return &Config{
ConsensusAddr: ctx.String(flags.ConsensusAddr.Name),
ConsensusPort: ctx.Int(flags.ConsensusPort.Name),
RaftBootstrap: ctx.Bool(flags.RaftBootstrap.Name),
RaftServerID: ctx.String(flags.RaftServerID.Name),
RaftStorageDir: ctx.String(flags.RaftStorageDir.Name),
NodeRPC: ctx.String(flags.NodeRPC.Name),
ExecutionRPC: ctx.String(flags.ExecutionRPC.Name),
Paused: ctx.Bool(flags.Paused.Name),
ConsensusAddr: ctx.String(flags.ConsensusAddr.Name),
ConsensusPort: ctx.Int(flags.ConsensusPort.Name),
RaftBootstrap: ctx.Bool(flags.RaftBootstrap.Name),
RaftServerID: ctx.String(flags.RaftServerID.Name),
RaftStorageDir: ctx.String(flags.RaftStorageDir.Name),
RaftSnapshotInterval: ctx.Duration(flags.RaftSnapshotInterval.Name),
RaftSnapshotThreshold: ctx.Uint64(flags.RaftSnapshotThreshold.Name),
RaftTrailingLogs: ctx.Uint64(flags.RaftTrailingLogs.Name),
NodeRPC: ctx.String(flags.NodeRPC.Name),
ExecutionRPC: ctx.String(flags.ExecutionRPC.Name),
Paused: ctx.Bool(flags.Paused.Name),
HealthCheck: HealthCheckConfig{
Interval: ctx.Uint64(flags.HealthCheckInterval.Name),
UnsafeInterval: ctx.Uint64(flags.HealthCheckUnsafeInterval.Name),
Expand Down
12 changes: 11 additions & 1 deletion op-conductor/conductor/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,17 @@ func (c *OpConductor) initConsensus(ctx context.Context) error {
}

serverAddr := fmt.Sprintf("%s:%d", c.cfg.ConsensusAddr, c.cfg.ConsensusPort)
cons, err := consensus.NewRaftConsensus(c.log, c.cfg.RaftServerID, serverAddr, c.cfg.RaftStorageDir, c.cfg.RaftBootstrap, &c.cfg.RollupCfg)
raftConsensusConfig := &consensus.RaftConsensusConfig{
ServerID: c.cfg.RaftServerID,
ServerAddr: serverAddr,
StorageDir: c.cfg.RaftStorageDir,
Bootstrap: c.cfg.RaftBootstrap,
RollupCfg: &c.cfg.RollupCfg,
SnapshotInterval: c.cfg.RaftSnapshotInterval,
SnapshotThreshold: c.cfg.RaftSnapshotThreshold,
TrailingLogs: c.cfg.RaftTrailingLogs,
}
cons, err := consensus.NewRaftConsensus(c.log, raftConsensusConfig)
if err != nil {
return errors.Wrap(err, "failed to create raft consensus")
}
Expand Down
34 changes: 24 additions & 10 deletions op-conductor/consensus/raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,17 @@ type RaftConsensus struct {
unsafeTracker *unsafeHeadTracker
}

type RaftConsensusConfig struct {
ServerID string
ServerAddr string
StorageDir string
Bootstrap bool
RollupCfg *rollup.Config
SnapshotInterval time.Duration
SnapshotThreshold uint64
TrailingLogs uint64
}

// checkTCPPortOpen attempts to connect to the specified address and returns an error if the connection fails.
func checkTCPPortOpen(address string) error {
conn, err := net.DialTimeout("tcp", address, 5*time.Second)
Expand All @@ -43,11 +54,14 @@ func checkTCPPortOpen(address string) error {
}

// NewRaftConsensus creates a new RaftConsensus instance.
func NewRaftConsensus(log log.Logger, serverID, serverAddr, storageDir string, bootstrap bool, rollupCfg *rollup.Config) (*RaftConsensus, error) {
func NewRaftConsensus(log log.Logger, cfg *RaftConsensusConfig) (*RaftConsensus, error) {
rc := raft.DefaultConfig()
rc.LocalID = raft.ServerID(serverID)
rc.SnapshotInterval = cfg.SnapshotInterval
rc.TrailingLogs = cfg.TrailingLogs
rc.SnapshotThreshold = cfg.SnapshotThreshold
rc.LocalID = raft.ServerID(cfg.ServerID)

baseDir := filepath.Join(storageDir, serverID)
baseDir := filepath.Join(cfg.StorageDir, cfg.ServerID)
if _, err := os.Stat(baseDir); os.IsNotExist(err) {
if err := os.MkdirAll(baseDir, 0o755); err != nil {
return nil, fmt.Errorf("error creating storage dir: %w", err)
Expand All @@ -72,7 +86,7 @@ func NewRaftConsensus(log log.Logger, serverID, serverAddr, storageDir string, b
return nil, fmt.Errorf(`raft.NewFileSnapshotStore(%q): %w`, baseDir, err)
}

addr, err := net.ResolveTCPAddr("tcp", serverAddr)
addr, err := net.ResolveTCPAddr("tcp", cfg.ServerAddr)
if err != nil {
return nil, errors.Wrap(err, "failed to resolve tcp address")
}
Expand All @@ -95,18 +109,18 @@ func NewRaftConsensus(log log.Logger, serverID, serverAddr, storageDir string, b

// If bootstrap = true, start raft in bootstrap mode, this will allow the current node to elect itself as leader when there's no other participants
// and allow other nodes to join the cluster.
if bootstrap {
cfg := raft.Configuration{
if cfg.Bootstrap {
raftCfg := raft.Configuration{
Servers: []raft.Server{
{
ID: rc.LocalID,
Address: raft.ServerAddress(serverAddr),
Address: raft.ServerAddress(cfg.ServerAddr),
Suffrage: raft.Voter,
},
},
}

f := r.BootstrapCluster(cfg)
f := r.BootstrapCluster(raftCfg)
if err := f.Error(); err != nil {
return nil, errors.Wrap(err, "failed to bootstrap raft cluster")
}
Expand All @@ -115,9 +129,9 @@ func NewRaftConsensus(log log.Logger, serverID, serverAddr, storageDir string, b
return &RaftConsensus{
log: log,
r: r,
serverID: raft.ServerID(serverID),
serverID: raft.ServerID(cfg.ServerID),
unsafeTracker: fsm,
rollupCfg: rollupCfg,
rollupCfg: cfg.RollupCfg,
}, nil
}

Expand Down
15 changes: 11 additions & 4 deletions op-conductor/consensus/raft_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,6 @@ import (

func TestCommitAndRead(t *testing.T) {
log := testlog.Logger(t, log.LevelInfo)
serverID := "SequencerA"
serverAddr := "127.0.0.1:0"
bootstrap := true
now := uint64(time.Now().Unix())
rollupCfg := &rollup.Config{
CanyonTime: &now,
Expand All @@ -29,8 +26,18 @@ func TestCommitAndRead(t *testing.T) {
if err := os.RemoveAll(storageDir); err != nil {
t.Fatal(err)
}
raftConsensusConfig := &RaftConsensusConfig{
ServerID: "SequencerA",
ServerAddr: "127.0.0.1:0",
StorageDir: storageDir,
Bootstrap: true,
RollupCfg: rollupCfg,
SnapshotInterval: 120 * time.Second,
SnapshotThreshold: 10240,
TrailingLogs: 8192,
}

cons, err := NewRaftConsensus(log, serverID, serverAddr, storageDir, bootstrap, rollupCfg)
cons, err := NewRaftConsensus(log, raftConsensusConfig)
require.NoError(t, err)

// wait till it became leader
Expand Down
22 changes: 22 additions & 0 deletions op-conductor/flags/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package flags

import (
"fmt"
"time"

"github.com/urfave/cli/v2"

Expand Down Expand Up @@ -44,6 +45,24 @@ var (
Usage: "Directory to store raft data",
EnvVars: opservice.PrefixEnvVar(EnvVarPrefix, "RAFT_STORAGE_DIR"),
}
RaftSnapshotInterval = &cli.DurationFlag{
Name: "raft.snapshot-interval",
Usage: "The interval to check if a snapshot should be taken.",
EnvVars: opservice.PrefixEnvVar(EnvVarPrefix, "RAFT_SNAPSHOT_INTERVAL"),
Value: 120 * time.Second,
}
RaftSnapshotThreshold = &cli.Uint64Flag{
Name: "raft.snapshot-threshold",
Usage: "Number of logs to trigger a snapshot",
EnvVars: opservice.PrefixEnvVar(EnvVarPrefix, "RAFT_SNAPSHOT_THRESHOLD"),
Value: 8192,
}
RaftTrailingLogs = &cli.Uint64Flag{
Name: "raft.trailing-logs",
Usage: "Number of logs to keep after a snapshot",
EnvVars: opservice.PrefixEnvVar(EnvVarPrefix, "RAFT_TRAILING_LOGS"),
Value: 10240,
}
NodeRPC = &cli.StringFlag{
Name: "node.rpc",
Usage: "HTTP provider URL for op-node",
Expand Down Expand Up @@ -113,6 +132,9 @@ var optionalFlags = []cli.Flag{
RaftBootstrap,
HealthCheckSafeEnabled,
HealthCheckSafeInterval,
RaftSnapshotInterval,
RaftSnapshotThreshold,
RaftTrailingLogs,
}

func init() {
Expand Down
19 changes: 11 additions & 8 deletions op-e2e/sequencer_failover_setup.go
Original file line number Diff line number Diff line change
Expand Up @@ -208,14 +208,17 @@ func setupConductor(
) (*conductor, error) {
consensusPort := findAvailablePort(t)
cfg := con.Config{
ConsensusAddr: localhost,
ConsensusPort: consensusPort,
RaftServerID: serverID,
RaftStorageDir: dir,
RaftBootstrap: bootstrap,
NodeRPC: nodeRPC,
ExecutionRPC: engineRPC,
Paused: true,
ConsensusAddr: localhost,
ConsensusPort: consensusPort,
RaftServerID: serverID,
RaftStorageDir: dir,
RaftBootstrap: bootstrap,
RaftSnapshotInterval: 120 * time.Second,
RaftSnapshotThreshold: 8192,
RaftTrailingLogs: 10240,
NodeRPC: nodeRPC,
ExecutionRPC: engineRPC,
Paused: true,
HealthCheck: con.HealthCheckConfig{
Interval: 1, // per test setup, l2 block time is 1s.
MinPeerCount: 2, // per test setup, each sequencer has 2 peers
Expand Down

0 comments on commit 75bd25a

Please sign in to comment.