Skip to content

Commit

Permalink
Supervisor: Safety Index (#12154)
Browse files Browse the repository at this point in the history
* fixes

* op-supervisor: head db init fix, logging, op-node debug logging

* interop: track recent safety data

* Early integration and refactor of Views and SafetyIndex

* update for rebase

* rename RecentSafetyIndex ; reorganize

* refactor Pointer method on iterator

* logging

* Delete unused Tracking Code ; New ChainsDB.Safest

* fix naming miss

* fix mistaken line deletion

* Update op-supervisor/supervisor/backend/safety/safety.go

Co-authored-by: protolambda <[email protected]>

* Add issue numbers to TODO ; Address Proto Comments

---------

Co-authored-by: protolambda <[email protected]>
  • Loading branch information
axelKingsley and protolambda authored Sep 27, 2024
1 parent 644dc2b commit 289cd71
Show file tree
Hide file tree
Showing 17 changed files with 497 additions and 648 deletions.
2 changes: 1 addition & 1 deletion op-e2e/interop/interop_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,6 @@ func TestInteropTrivial(t *testing.T) {

fmt.Println("Result of emitting event:", rec)

time.Sleep(10 * time.Second)
time.Sleep(60 * time.Second)

}
2 changes: 1 addition & 1 deletion op-e2e/interop/supersystem.go
Original file line number Diff line number Diff line change
Expand Up @@ -471,7 +471,7 @@ func (s *interopE2ESystem) SupervisorClient() *sources.SupervisorClient {
// their creation can't be safely skipped or reordered at this time
func (s *interopE2ESystem) prepare(t *testing.T, w worldResourcePaths) {
s.t = t
s.logger = testlog.Logger(s.t, log.LevelInfo)
s.logger = testlog.Logger(s.t, log.LevelDebug)
s.hdWallet = s.prepareHDWallet()
s.worldDeployment, s.worldOutput = s.prepareWorld(w)

Expand Down
3 changes: 3 additions & 0 deletions op-node/rollup/interop/interop.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,7 @@ func (d *InteropDeriver) OnEvent(ev event.Event) bool {
d.emitter.Emit(engine.PromoteCrossUnsafeEvent{Ref: candidate})
}
case engine.LocalSafeUpdateEvent:
d.log.Debug("Local safe update event", "block", x.Ref.Hash, "derivedFrom", x.DerivedFrom)
d.derivedFrom[x.Ref.Hash] = x.DerivedFrom
d.emitter.Emit(engine.RequestCrossSafeEvent{})
case engine.CrossSafeUpdateEvent:
Expand All @@ -132,10 +133,12 @@ func (d *InteropDeriver) OnEvent(ev event.Event) bool {
}
derivedFrom, ok := d.derivedFrom[candidate.Hash]
if !ok {
d.log.Warn("Unknown block candidate source, cannot promote block safety", "block", candidate, "safety", blockSafety)
break
}
switch blockSafety {
case types.CrossSafe:
d.log.Info("Verified cross-safe block", "block", candidate, "derivedFrom", derivedFrom)
// TODO(#11673): once we have interop reorg support, we need to clean stale blocks also.
delete(d.derivedFrom, candidate.Hash)
d.emitter.Emit(engine.PromoteSafeEvent{
Expand Down
4 changes: 4 additions & 0 deletions op-node/rollup/status/status.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,18 +63,22 @@ func (st *StatusTracker) OnEvent(ev event.Event) bool {

switch x := ev.(type) {
case engine.ForkchoiceUpdateEvent:
st.log.Debug("Forkchoice update", "unsafe", x.UnsafeL2Head, "safe", x.SafeL2Head, "finalized", x.FinalizedL2Head)
st.data.UnsafeL2 = x.UnsafeL2Head
st.data.SafeL2 = x.SafeL2Head
st.data.FinalizedL2 = x.FinalizedL2Head
case engine.PendingSafeUpdateEvent:
st.data.UnsafeL2 = x.Unsafe
st.data.PendingSafeL2 = x.PendingSafe
case engine.CrossUnsafeUpdateEvent:
st.log.Debug("Cross unsafe head updated", "cross_unsafe", x.CrossUnsafe, "local_unsafe", x.LocalUnsafe)
st.data.CrossUnsafeL2 = x.CrossUnsafe
st.data.UnsafeL2 = x.LocalUnsafe
case engine.LocalSafeUpdateEvent:
st.log.Debug("Local safe head updated", "local_safe", x.Ref)
st.data.LocalSafeL2 = x.Ref
case engine.CrossSafeUpdateEvent:
st.log.Debug("Cross safe head updated", "cross_safe", x.CrossSafe, "local_safe", x.LocalSafe)
st.data.SafeL2 = x.CrossSafe
st.data.LocalSafeL2 = x.LocalSafe
case derive.DeriverL1StatusEvent:
Expand Down
56 changes: 3 additions & 53 deletions op-supervisor/supervisor/backend/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (
"errors"
"fmt"
"io"
"path/filepath"
"sync/atomic"
"time"

Expand All @@ -18,7 +17,6 @@ import (
"github.com/ethereum-optimism/optimism/op-service/eth"
"github.com/ethereum-optimism/optimism/op-supervisor/config"
"github.com/ethereum-optimism/optimism/op-supervisor/supervisor/backend/db"
"github.com/ethereum-optimism/optimism/op-supervisor/supervisor/backend/db/heads"
"github.com/ethereum-optimism/optimism/op-supervisor/supervisor/backend/db/logs"
"github.com/ethereum-optimism/optimism/op-supervisor/supervisor/backend/source"
"github.com/ethereum-optimism/optimism/op-supervisor/supervisor/frontend"
Expand All @@ -33,8 +31,6 @@ type SupervisorBackend struct {

chainMonitors map[types.ChainID]*source.ChainMonitor
db *db.ChainsDB

maintenanceCancel context.CancelFunc
}

var _ frontend.Backend = (*SupervisorBackend)(nil)
Expand All @@ -47,14 +43,8 @@ func NewSupervisorBackend(ctx context.Context, logger log.Logger, m Metrics, cfg
return nil, err
}

// create the head tracker
headTracker, err := heads.NewHeadTracker(logger, filepath.Join(cfg.Datadir, "heads.json"))
if err != nil {
return nil, fmt.Errorf("failed to load existing heads: %w", err)
}

// create the chains db
db := db.NewChainsDB(map[types.ChainID]db.LogStorage{}, headTracker, logger)
db := db.NewChainsDB(map[types.ChainID]db.LogStorage{}, logger)

// create an empty map of chain monitors
chainMonitors := make(map[types.ChainID]*source.ChainMonitor, len(cfg.L2RPCs))
Expand Down Expand Up @@ -145,10 +135,6 @@ func (su *SupervisorBackend) Start(ctx context.Context) error {
return fmt.Errorf("failed to start chain monitor: %w", err)
}
}
// start db maintenance loop
maintenanceCtx, cancel := context.WithCancel(context.Background())
su.db.StartCrossHeadMaintenance(maintenanceCtx)
su.maintenanceCancel = cancel
return nil
}

Expand All @@ -158,8 +144,6 @@ func (su *SupervisorBackend) Stop(ctx context.Context) error {
if !su.started.CompareAndSwap(true, false) {
return errAlreadyStopped
}
// signal the maintenance loop to stop
su.maintenanceCancel()
// collect errors from stopping chain monitors
var errs error
for _, monitor := range su.chainMonitors {
Expand Down Expand Up @@ -200,24 +184,7 @@ func (su *SupervisorBackend) CheckMessage(identifier types.Identifier, payloadHa
if err != nil {
return types.Invalid, fmt.Errorf("failed to check log: %w", err)
}
safest := types.CrossUnsafe
// at this point we have the log entry, and we can check if it is safe by various criteria
for _, checker := range []db.SafetyChecker{
db.NewSafetyChecker(db.Unsafe, su.db),
db.NewSafetyChecker(db.Safe, su.db),
db.NewSafetyChecker(db.Finalized, su.db),
} {
// check local safety limit first as it's more permissive
localPtr := checker.LocalHead(chainID)
if localPtr.WithinRange(blockNum, uint32(logIdx)) {
safest = checker.LocalSafetyLevel()
}
// check cross safety level
crossPtr := checker.CrossHead(chainID)
if crossPtr.WithinRange(blockNum, uint32(logIdx)) {
safest = checker.CrossSafetyLevel()
}
}
safest := su.db.Safest(chainID, blockNum, uint32(logIdx))
return safest, nil
}

Expand All @@ -243,7 +210,6 @@ func (su *SupervisorBackend) CheckMessages(
// The block is considered safe if all logs in the block are safe
// this is decided by finding the last log in the block and
func (su *SupervisorBackend) CheckBlock(chainID *hexutil.U256, blockHash common.Hash, blockNumber hexutil.Uint64) (types.SafetyLevel, error) {
safest := types.CrossUnsafe
// find the last log index in the block
id := eth.BlockID{Hash: blockHash, Number: uint64(blockNumber)}
_, err := su.db.FindSealedBlock(types.ChainID(*chainID), id)
Expand All @@ -257,22 +223,6 @@ func (su *SupervisorBackend) CheckBlock(chainID *hexutil.U256, blockHash common.
su.logger.Error("failed to scan block", "err", err)
return "", err
}
// at this point we have the extent of the block, and we can check if it is safe by various criteria
for _, checker := range []db.SafetyChecker{
db.NewSafetyChecker(db.Unsafe, su.db),
db.NewSafetyChecker(db.Safe, su.db),
db.NewSafetyChecker(db.Finalized, su.db),
} {
// check local safety limit first as it's more permissive
localPtr := checker.LocalHead(types.ChainID(*chainID))
if localPtr.IsSealed(uint64(blockNumber)) {
safest = checker.LocalSafetyLevel()
}
// check cross safety level
crossPtr := checker.CrossHead(types.ChainID(*chainID))
if crossPtr.IsSealed(uint64(blockNumber)) {
safest = checker.CrossSafetyLevel()
}
}
safest := su.db.Safest(types.ChainID(*chainID), uint64(blockNumber), 0)
return safest, nil
}
Loading

0 comments on commit 289cd71

Please sign in to comment.