From 289cd71be84c19d441e1def19c4e3a7aac7cbd7d Mon Sep 17 00:00:00 2001 From: Axel Kingsley Date: Fri, 27 Sep 2024 17:12:41 -0500 Subject: [PATCH] Supervisor: Safety Index (#12154) * fixes * op-supervisor: head db init fix, logging, op-node debug logging * interop: track recent safety data * Early integration and refactor of Views and SafetyIndex * update for rebase * rename RecentSafetyIndex ; reorganize * refactor Pointer method on iterator * logging * Delete unused Tracking Code ; New ChainsDB.Safest * fix naming miss * fix mistaken line deletion * Update op-supervisor/supervisor/backend/safety/safety.go Co-authored-by: protolambda * Add issue numbers to TODO ; Address Proto Comments --------- Co-authored-by: protolambda --- op-e2e/interop/interop_test.go | 2 +- op-e2e/interop/supersystem.go | 2 +- op-node/rollup/interop/interop.go | 3 + op-node/rollup/status/status.go | 4 + op-supervisor/supervisor/backend/backend.go | 56 +--- op-supervisor/supervisor/backend/db/db.go | 261 ++++------------- .../supervisor/backend/db/heads/types.go | 1 + .../supervisor/backend/db/logs/iterator.go | 28 ++ .../supervisor/backend/db/logs/state.go | 13 + .../supervisor/backend/db/safety_checkers.go | 153 ---------- .../backend/db/safety_checkers_test.go | 215 -------------- .../supervisor/backend/safety/safety.go | 270 ++++++++++++++++++ .../supervisor/backend/safety/views.go | 91 ++++++ .../supervisor/backend/source/chain.go | 15 +- .../backend/source/chain_processor.go | 10 +- .../backend/source/log_processor.go | 11 +- .../backend/source/log_processor_test.go | 10 +- 17 files changed, 497 insertions(+), 648 deletions(-) delete mode 100644 op-supervisor/supervisor/backend/db/safety_checkers.go delete mode 100644 op-supervisor/supervisor/backend/db/safety_checkers_test.go create mode 100644 op-supervisor/supervisor/backend/safety/safety.go create mode 100644 op-supervisor/supervisor/backend/safety/views.go diff --git a/op-e2e/interop/interop_test.go b/op-e2e/interop/interop_test.go index 65265c22e7c2..0d593673ecce 100644 --- a/op-e2e/interop/interop_test.go +++ b/op-e2e/interop/interop_test.go @@ -95,6 +95,6 @@ func TestInteropTrivial(t *testing.T) { fmt.Println("Result of emitting event:", rec) - time.Sleep(10 * time.Second) + time.Sleep(60 * time.Second) } diff --git a/op-e2e/interop/supersystem.go b/op-e2e/interop/supersystem.go index ffa91bef97f3..3630b87dc896 100644 --- a/op-e2e/interop/supersystem.go +++ b/op-e2e/interop/supersystem.go @@ -471,7 +471,7 @@ func (s *interopE2ESystem) SupervisorClient() *sources.SupervisorClient { // their creation can't be safely skipped or reordered at this time func (s *interopE2ESystem) prepare(t *testing.T, w worldResourcePaths) { s.t = t - s.logger = testlog.Logger(s.t, log.LevelInfo) + s.logger = testlog.Logger(s.t, log.LevelDebug) s.hdWallet = s.prepareHDWallet() s.worldDeployment, s.worldOutput = s.prepareWorld(w) diff --git a/op-node/rollup/interop/interop.go b/op-node/rollup/interop/interop.go index 35b1a86e9635..152020f09c70 100644 --- a/op-node/rollup/interop/interop.go +++ b/op-node/rollup/interop/interop.go @@ -107,6 +107,7 @@ func (d *InteropDeriver) OnEvent(ev event.Event) bool { d.emitter.Emit(engine.PromoteCrossUnsafeEvent{Ref: candidate}) } case engine.LocalSafeUpdateEvent: + d.log.Debug("Local safe update event", "block", x.Ref.Hash, "derivedFrom", x.DerivedFrom) d.derivedFrom[x.Ref.Hash] = x.DerivedFrom d.emitter.Emit(engine.RequestCrossSafeEvent{}) case engine.CrossSafeUpdateEvent: @@ -132,10 +133,12 @@ func (d *InteropDeriver) OnEvent(ev event.Event) bool { } derivedFrom, ok := d.derivedFrom[candidate.Hash] if !ok { + d.log.Warn("Unknown block candidate source, cannot promote block safety", "block", candidate, "safety", blockSafety) break } switch blockSafety { case types.CrossSafe: + d.log.Info("Verified cross-safe block", "block", candidate, "derivedFrom", derivedFrom) // TODO(#11673): once we have interop reorg support, we need to clean stale blocks also. delete(d.derivedFrom, candidate.Hash) d.emitter.Emit(engine.PromoteSafeEvent{ diff --git a/op-node/rollup/status/status.go b/op-node/rollup/status/status.go index 65121b1294aa..26e9ddbc2197 100644 --- a/op-node/rollup/status/status.go +++ b/op-node/rollup/status/status.go @@ -63,6 +63,7 @@ func (st *StatusTracker) OnEvent(ev event.Event) bool { switch x := ev.(type) { case engine.ForkchoiceUpdateEvent: + st.log.Debug("Forkchoice update", "unsafe", x.UnsafeL2Head, "safe", x.SafeL2Head, "finalized", x.FinalizedL2Head) st.data.UnsafeL2 = x.UnsafeL2Head st.data.SafeL2 = x.SafeL2Head st.data.FinalizedL2 = x.FinalizedL2Head @@ -70,11 +71,14 @@ func (st *StatusTracker) OnEvent(ev event.Event) bool { st.data.UnsafeL2 = x.Unsafe st.data.PendingSafeL2 = x.PendingSafe case engine.CrossUnsafeUpdateEvent: + st.log.Debug("Cross unsafe head updated", "cross_unsafe", x.CrossUnsafe, "local_unsafe", x.LocalUnsafe) st.data.CrossUnsafeL2 = x.CrossUnsafe st.data.UnsafeL2 = x.LocalUnsafe case engine.LocalSafeUpdateEvent: + st.log.Debug("Local safe head updated", "local_safe", x.Ref) st.data.LocalSafeL2 = x.Ref case engine.CrossSafeUpdateEvent: + st.log.Debug("Cross safe head updated", "cross_safe", x.CrossSafe, "local_safe", x.LocalSafe) st.data.SafeL2 = x.CrossSafe st.data.LocalSafeL2 = x.LocalSafe case derive.DeriverL1StatusEvent: diff --git a/op-supervisor/supervisor/backend/backend.go b/op-supervisor/supervisor/backend/backend.go index 1f020889f2f1..8216eaa9c0b5 100644 --- a/op-supervisor/supervisor/backend/backend.go +++ b/op-supervisor/supervisor/backend/backend.go @@ -5,7 +5,6 @@ import ( "errors" "fmt" "io" - "path/filepath" "sync/atomic" "time" @@ -18,7 +17,6 @@ import ( "github.com/ethereum-optimism/optimism/op-service/eth" "github.com/ethereum-optimism/optimism/op-supervisor/config" "github.com/ethereum-optimism/optimism/op-supervisor/supervisor/backend/db" - "github.com/ethereum-optimism/optimism/op-supervisor/supervisor/backend/db/heads" "github.com/ethereum-optimism/optimism/op-supervisor/supervisor/backend/db/logs" "github.com/ethereum-optimism/optimism/op-supervisor/supervisor/backend/source" "github.com/ethereum-optimism/optimism/op-supervisor/supervisor/frontend" @@ -33,8 +31,6 @@ type SupervisorBackend struct { chainMonitors map[types.ChainID]*source.ChainMonitor db *db.ChainsDB - - maintenanceCancel context.CancelFunc } var _ frontend.Backend = (*SupervisorBackend)(nil) @@ -47,14 +43,8 @@ func NewSupervisorBackend(ctx context.Context, logger log.Logger, m Metrics, cfg return nil, err } - // create the head tracker - headTracker, err := heads.NewHeadTracker(logger, filepath.Join(cfg.Datadir, "heads.json")) - if err != nil { - return nil, fmt.Errorf("failed to load existing heads: %w", err) - } - // create the chains db - db := db.NewChainsDB(map[types.ChainID]db.LogStorage{}, headTracker, logger) + db := db.NewChainsDB(map[types.ChainID]db.LogStorage{}, logger) // create an empty map of chain monitors chainMonitors := make(map[types.ChainID]*source.ChainMonitor, len(cfg.L2RPCs)) @@ -145,10 +135,6 @@ func (su *SupervisorBackend) Start(ctx context.Context) error { return fmt.Errorf("failed to start chain monitor: %w", err) } } - // start db maintenance loop - maintenanceCtx, cancel := context.WithCancel(context.Background()) - su.db.StartCrossHeadMaintenance(maintenanceCtx) - su.maintenanceCancel = cancel return nil } @@ -158,8 +144,6 @@ func (su *SupervisorBackend) Stop(ctx context.Context) error { if !su.started.CompareAndSwap(true, false) { return errAlreadyStopped } - // signal the maintenance loop to stop - su.maintenanceCancel() // collect errors from stopping chain monitors var errs error for _, monitor := range su.chainMonitors { @@ -200,24 +184,7 @@ func (su *SupervisorBackend) CheckMessage(identifier types.Identifier, payloadHa if err != nil { return types.Invalid, fmt.Errorf("failed to check log: %w", err) } - safest := types.CrossUnsafe - // at this point we have the log entry, and we can check if it is safe by various criteria - for _, checker := range []db.SafetyChecker{ - db.NewSafetyChecker(db.Unsafe, su.db), - db.NewSafetyChecker(db.Safe, su.db), - db.NewSafetyChecker(db.Finalized, su.db), - } { - // check local safety limit first as it's more permissive - localPtr := checker.LocalHead(chainID) - if localPtr.WithinRange(blockNum, uint32(logIdx)) { - safest = checker.LocalSafetyLevel() - } - // check cross safety level - crossPtr := checker.CrossHead(chainID) - if crossPtr.WithinRange(blockNum, uint32(logIdx)) { - safest = checker.CrossSafetyLevel() - } - } + safest := su.db.Safest(chainID, blockNum, uint32(logIdx)) return safest, nil } @@ -243,7 +210,6 @@ func (su *SupervisorBackend) CheckMessages( // The block is considered safe if all logs in the block are safe // this is decided by finding the last log in the block and func (su *SupervisorBackend) CheckBlock(chainID *hexutil.U256, blockHash common.Hash, blockNumber hexutil.Uint64) (types.SafetyLevel, error) { - safest := types.CrossUnsafe // find the last log index in the block id := eth.BlockID{Hash: blockHash, Number: uint64(blockNumber)} _, err := su.db.FindSealedBlock(types.ChainID(*chainID), id) @@ -257,22 +223,6 @@ func (su *SupervisorBackend) CheckBlock(chainID *hexutil.U256, blockHash common. su.logger.Error("failed to scan block", "err", err) return "", err } - // at this point we have the extent of the block, and we can check if it is safe by various criteria - for _, checker := range []db.SafetyChecker{ - db.NewSafetyChecker(db.Unsafe, su.db), - db.NewSafetyChecker(db.Safe, su.db), - db.NewSafetyChecker(db.Finalized, su.db), - } { - // check local safety limit first as it's more permissive - localPtr := checker.LocalHead(types.ChainID(*chainID)) - if localPtr.IsSealed(uint64(blockNumber)) { - safest = checker.LocalSafetyLevel() - } - // check cross safety level - crossPtr := checker.CrossHead(types.ChainID(*chainID)) - if crossPtr.IsSealed(uint64(blockNumber)) { - safest = checker.CrossSafetyLevel() - } - } + safest := su.db.Safest(types.ChainID(*chainID), uint64(blockNumber), 0) return safest, nil } diff --git a/op-supervisor/supervisor/backend/db/db.go b/op-supervisor/supervisor/backend/db/db.go index 6c5e354dd0ab..8459266c0704 100644 --- a/op-supervisor/supervisor/backend/db/db.go +++ b/op-supervisor/supervisor/backend/db/db.go @@ -1,19 +1,17 @@ package db import ( - "context" "errors" "fmt" "io" - "time" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/log" "github.com/ethereum-optimism/optimism/op-service/eth" "github.com/ethereum-optimism/optimism/op-supervisor/supervisor/backend/db/entrydb" - "github.com/ethereum-optimism/optimism/op-supervisor/supervisor/backend/db/heads" "github.com/ethereum-optimism/optimism/op-supervisor/supervisor/backend/db/logs" + "github.com/ethereum-optimism/optimism/op-supervisor/supervisor/backend/safety" "github.com/ethereum-optimism/optimism/op-supervisor/supervisor/types" ) @@ -49,39 +47,21 @@ type LogStorage interface { var _ LogStorage = (*logs.DB)(nil) -type HeadsStorage interface { - CrossUnsafe(id types.ChainID) heads.HeadPointer - CrossSafe(id types.ChainID) heads.HeadPointer - CrossFinalized(id types.ChainID) heads.HeadPointer - LocalUnsafe(id types.ChainID) heads.HeadPointer - LocalSafe(id types.ChainID) heads.HeadPointer - LocalFinalized(id types.ChainID) heads.HeadPointer - - UpdateCrossUnsafe(id types.ChainID, pointer heads.HeadPointer) error - UpdateCrossSafe(id types.ChainID, pointer heads.HeadPointer) error - UpdateCrossFinalized(id types.ChainID, pointer heads.HeadPointer) error - - UpdateLocalUnsafe(id types.ChainID, pointer heads.HeadPointer) error - UpdateLocalSafe(id types.ChainID, pointer heads.HeadPointer) error - UpdateLocalFinalized(id types.ChainID, pointer heads.HeadPointer) error -} - // ChainsDB is a database that stores logs and heads for multiple chains. // it implements the ChainsStorage interface. type ChainsDB struct { - logDBs map[types.ChainID]LogStorage - heads HeadsStorage - maintenanceReady chan struct{} - logger log.Logger + logDBs map[types.ChainID]LogStorage + safetyIndex safety.SafetyIndex + logger log.Logger } -func NewChainsDB(logDBs map[types.ChainID]LogStorage, heads HeadsStorage, l log.Logger) *ChainsDB { - return &ChainsDB{ - logDBs: logDBs, - heads: heads, - logger: l, - maintenanceReady: make(chan struct{}, 1), +func NewChainsDB(logDBs map[types.ChainID]LogStorage, l log.Logger) *ChainsDB { + ret := &ChainsDB{ + logDBs: logDBs, + logger: l, } + ret.safetyIndex = safety.NewSafetyIndex(l, ret) + return ret } func (db *ChainsDB) AddLogDB(chain types.ChainID, logDB LogStorage) { @@ -91,6 +71,14 @@ func (db *ChainsDB) AddLogDB(chain types.ChainID, logDB LogStorage) { db.logDBs[chain] = logDB } +func (db *ChainsDB) IteratorStartingAt(chain types.ChainID, sealedNum uint64, logIndex uint32) (logs.Iterator, error) { + logDB, ok := db.logDBs[chain] + if !ok { + return nil, fmt.Errorf("%w: %v", ErrUnknownChain, chain) + } + return logDB.IteratorStartingAt(sealedNum, logIndex) +} + // ResumeFromLastSealedBlock prepares the chains db to resume recording events after a restart. // It rewinds the database to the last block that is guaranteed to have been fully recorded to the database, // to ensure it can resume recording from the first log of the next block. @@ -110,187 +98,39 @@ func (db *ChainsDB) ResumeFromLastSealedBlock() error { return nil } -// StartCrossHeadMaintenance starts a background process that maintains the cross-heads of the chains -// for now it does not prevent multiple instances of this process from running -func (db *ChainsDB) StartCrossHeadMaintenance(ctx context.Context) { - go func() { - db.logger.Info("cross-head maintenance loop started") - // run the maintenance loop every 1 seconds for now - ticker := time.NewTicker(time.Second * 1) - for { - select { - case <-ctx.Done(): - db.logger.Warn("context cancelled, stopping maintenance loop") - return - case <-ticker.C: - db.logger.Debug("regular maintenance requested") - db.RequestMaintenance() - case <-db.maintenanceReady: - db.logger.Debug("running maintenance") - if err := db.updateAllHeads(); err != nil { - db.logger.Error("failed to update cross-heads", "err", err) - } - } - } - }() -} - // Check calls the underlying logDB to determine if the given log entry is safe with respect to the checker's criteria. -func (db *ChainsDB) Check(chain types.ChainID, blockNum uint64, logIdx uint32, logHash common.Hash) (entrydb.EntryIdx, error) { +func (db *ChainsDB) Check(chain types.ChainID, blockNum uint64, logIdx uint32, logHash common.Hash) (common.Hash, error) { logDB, ok := db.logDBs[chain] if !ok { - return 0, fmt.Errorf("%w: %v", ErrUnknownChain, chain) - } - return logDB.Contains(blockNum, logIdx, logHash) -} - -// RequestMaintenance requests that the maintenance loop update the cross-heads -// it does not block if maintenance is already scheduled -func (db *ChainsDB) RequestMaintenance() { - select { - case db.maintenanceReady <- struct{}{}: - return - default: - return + return common.Hash{}, fmt.Errorf("%w: %v", ErrUnknownChain, chain) } -} - -// updateAllHeads updates the cross-heads of all safety levels -// it is called by the maintenance loop -func (db *ChainsDB) updateAllHeads() error { - // create three safety checkers, one for each safety level - unsafeChecker := NewSafetyChecker(Unsafe, db) - safeChecker := NewSafetyChecker(Safe, db) - finalizedChecker := NewSafetyChecker(Finalized, db) - for _, checker := range []SafetyChecker{ - unsafeChecker, - safeChecker, - finalizedChecker} { - if err := db.UpdateCrossHeads(checker); err != nil { - return fmt.Errorf("failed to update cross-heads for safety level %s: %w", checker, err) - } + _, err := logDB.Contains(blockNum, logIdx, logHash) + if err != nil { + return common.Hash{}, err } - return nil + // TODO(#11693): need to get the actual block hash for this log entry for reorg detection + return common.Hash{}, nil } -// UpdateCrossHeadsForChain updates the cross-head for a single chain. -// the provided checker controls which heads are considered. -func (db *ChainsDB) UpdateCrossHeadsForChain(chainID types.ChainID, checker SafetyChecker) error { - // start with the xsafe head of the chain - xHead := checker.CrossHead(chainID) - // advance as far as the local head - localHead := checker.LocalHead(chainID) - // get an iterator for the next item - iter, err := db.logDBs[chainID].IteratorStartingAt(xHead.LastSealedBlockNum, xHead.LogsSince) - if err != nil { - return fmt.Errorf("failed to open iterator at sealed block %d logsSince %d for chain %v: %w", - xHead.LastSealedBlockNum, xHead.LogsSince, chainID, err) +// Safest returns the strongest safety level that can be guaranteed for the given log entry. +// it assumes the log entry has already been checked and is valid, this funcion only checks safety levels. +func (db *ChainsDB) Safest(chainID types.ChainID, blockNum uint64, index uint32) (safest types.SafetyLevel) { + safest = types.LocalUnsafe + if crossUnsafe, err := db.safetyIndex.CrossUnsafeL2(chainID); err == nil && crossUnsafe.WithinRange(blockNum, index) { + safest = types.CrossUnsafe } - // track if we updated the cross-head - updated := false - // advance the logDB through all executing messages we can - // this loop will break: - // - when we reach the local head - // - when we reach a message that is not safe - // - if an error occurs - for { - if err := iter.NextInitMsg(); errors.Is(err, logs.ErrFuture) { - // We ran out of events, but there can still be empty blocks. - // Take the last block we've processed, and try to update the x-head with it. - sealedBlockHash, sealedBlockNum, ok := iter.SealedBlock() - if !ok { - break - } - // We can only drop the logsSince value to 0 if the block is not seen. - if sealedBlockNum > xHead.LastSealedBlockNum { - // if we would exceed the local head, then abort - if !localHead.WithinRange(sealedBlockNum, 0) { - break - } - xHead = heads.HeadPointer{ - LastSealedBlockHash: sealedBlockHash, - LastSealedBlockNum: sealedBlockNum, - LogsSince: 0, - } - updated = true - } - break - } else if err != nil { - return fmt.Errorf("failed to read next executing message for chain %v: %w", chainID, err) - } - - sealedBlockHash, sealedBlockNum, ok := iter.SealedBlock() - if !ok { - break - } - _, logIdx, ok := iter.InitMessage() - if !ok { - break - } - // if we would exceed the local head, then abort - if !localHead.WithinRange(sealedBlockNum, logIdx) { - break - } - - // Check the executing message, if any - exec := iter.ExecMessage() - if exec != nil { - // Use the checker to determine if this message exists in the canonical chain, - // within the view of the checker's safety level - if err := checker.CheckCross( - types.ChainIDFromUInt64(uint64(exec.Chain)), - exec.BlockNum, - exec.LogIdx, - exec.Hash); err != nil { - if errors.Is(err, logs.ErrConflict) { - db.logger.Error("Bad executing message!", "err", err) - } else if errors.Is(err, logs.ErrFuture) { - db.logger.Warn("Executing message references future message", "err", err) - } else { - db.logger.Error("Failed to check executing message") - } - break - } - } - // if all is well, prepare the x-head update to this point - xHead = heads.HeadPointer{ - LastSealedBlockHash: sealedBlockHash, - LastSealedBlockNum: sealedBlockNum, - LogsSince: logIdx + 1, - } - updated = true + if localSafe, err := db.safetyIndex.LocalSafeL2(chainID); err == nil && localSafe.WithinRange(blockNum, index) { + safest = types.LocalSafe } - // if any chain was updated, we can trigger a maintenance request - // this allows for the maintenance loop to handle cascading updates - // instead of waiting for the next scheduled update - if updated { - db.logger.Info("Promoting cross-head", "chain", chainID, "head", xHead, "safety-level", checker.CrossSafetyLevel()) - err = checker.UpdateCross(chainID, xHead) - if err != nil { - return fmt.Errorf("failed to update cross-head for chain %v: %w", chainID, err) - } - db.RequestMaintenance() - } else { - db.logger.Debug("No cross-head update", "chain", chainID, "head", xHead, "safety-level", checker.CrossSafetyLevel()) + if crossSafe, err := db.safetyIndex.LocalSafeL2(chainID); err == nil && crossSafe.WithinRange(blockNum, index) { + safest = types.CrossSafe } - return nil -} - -func (db *ChainsDB) Heads() HeadsStorage { - return db.heads -} - -// UpdateCrossHeads updates the cross-heads of all chains -// based on the provided SafetyChecker. The SafetyChecker is used to determine -// the safety of each log entry in the database, and the cross-head associated with it. -func (db *ChainsDB) UpdateCrossHeads(checker SafetyChecker) error { - for chainID := range db.logDBs { - err := db.UpdateCrossHeadsForChain(chainID, checker) - if err != nil { - return err + if finalized, err := db.safetyIndex.FinalizedL2(chainID); err == nil { + if finalized.Number >= blockNum { + safest = types.Finalized } } - return nil + return } func (db *ChainsDB) FindSealedBlock(chain types.ChainID, block eth.BlockID) (nextEntry entrydb.EntryIdx, err error) { @@ -312,20 +152,35 @@ func (db *ChainsDB) LatestBlockNum(chain types.ChainID) (num uint64, ok bool) { return logDB.LatestSealedBlockNum() } -func (db *ChainsDB) SealBlock(chain types.ChainID, parentHash common.Hash, block eth.BlockID, timestamp uint64) error { +func (db *ChainsDB) AddLog( + chain types.ChainID, + logHash common.Hash, + parentBlock eth.BlockID, + logIdx uint32, + execMsg *types.ExecutingMessage) error { logDB, ok := db.logDBs[chain] if !ok { return fmt.Errorf("%w: %v", ErrUnknownChain, chain) } - return logDB.SealBlock(parentHash, block, timestamp) + return logDB.AddLog(logHash, parentBlock, logIdx, execMsg) } -func (db *ChainsDB) AddLog(chain types.ChainID, logHash common.Hash, parentBlock eth.BlockID, logIdx uint32, execMsg *types.ExecutingMessage) error { +func (db *ChainsDB) SealBlock( + chain types.ChainID, + block eth.L2BlockRef) error { logDB, ok := db.logDBs[chain] if !ok { return fmt.Errorf("%w: %v", ErrUnknownChain, chain) } - return logDB.AddLog(logHash, parentBlock, logIdx, execMsg) + err := logDB.SealBlock(block.ParentHash, block.ID(), block.Time) + if err != nil { + return fmt.Errorf("failed to seal block %v: %w", block, err) + } + err = db.safetyIndex.UpdateLocalUnsafe(chain, block) + if err != nil { + return fmt.Errorf("failed to update local-unsafe: %w", err) + } + return nil } func (db *ChainsDB) Rewind(chain types.ChainID, headBlockNum uint64) error { diff --git a/op-supervisor/supervisor/backend/db/heads/types.go b/op-supervisor/supervisor/backend/db/heads/types.go index 3e54593e33c7..7db0bff2d106 100644 --- a/op-supervisor/supervisor/backend/db/heads/types.go +++ b/op-supervisor/supervisor/backend/db/heads/types.go @@ -13,6 +13,7 @@ type HeadPointer struct { // LastSealedBlockHash is the last fully-processed block LastSealedBlockHash common.Hash LastSealedBlockNum uint64 + LastSealedTimestamp uint64 // Number of logs that have been verified since the LastSealedBlock. // These logs are contained in the block that builds on top of the LastSealedBlock. diff --git a/op-supervisor/supervisor/backend/db/logs/iterator.go b/op-supervisor/supervisor/backend/db/logs/iterator.go index 4b3bd1b65908..f9e65c41e890 100644 --- a/op-supervisor/supervisor/backend/db/logs/iterator.go +++ b/op-supervisor/supervisor/backend/db/logs/iterator.go @@ -8,11 +8,13 @@ import ( "github.com/ethereum/go-ethereum/common" "github.com/ethereum-optimism/optimism/op-supervisor/supervisor/backend/db/entrydb" + "github.com/ethereum-optimism/optimism/op-supervisor/supervisor/backend/db/heads" "github.com/ethereum-optimism/optimism/op-supervisor/supervisor/types" ) type IteratorState interface { NextIndex() entrydb.EntryIdx + HeadPointer() (heads.HeadPointer, error) SealedBlock() (hash common.Hash, num uint64, ok bool) InitMessage() (hash common.Hash, logIndex uint32, ok bool) ExecMessage() *types.ExecutingMessage @@ -23,6 +25,7 @@ type Iterator interface { NextInitMsg() error NextExecMsg() error NextBlock() error + TraverseConditional(traverseConditionalFn) error IteratorState } @@ -32,6 +35,8 @@ type iterator struct { entriesRead int64 } +type traverseConditionalFn func(state IteratorState) error + // End traverses the iterator to the end of the DB. // It does not return io.EOF or ErrFuture. func (i *iterator) End() error { @@ -105,6 +110,25 @@ func (i *iterator) NextBlock() error { } } +func (i *iterator) TraverseConditional(fn traverseConditionalFn) error { + var snapshot logContext + for { + snapshot = i.current // copy the iterator state + _, err := i.next() + if err != nil { + i.current = snapshot + return err + } + if i.current.need != 0 { // skip intermediate states + continue + } + if err := fn(&i.current); err != nil { + i.current = snapshot + return err + } + } +} + // Read and apply the next entry. func (i *iterator) next() (entrydb.EntryType, error) { index := i.current.nextEntryIndex @@ -142,3 +166,7 @@ func (i *iterator) InitMessage() (hash common.Hash, logIndex uint32, ok bool) { func (i *iterator) ExecMessage() *types.ExecutingMessage { return i.current.ExecMessage() } + +func (i *iterator) HeadPointer() (heads.HeadPointer, error) { + return i.current.HeadPointer() +} diff --git a/op-supervisor/supervisor/backend/db/logs/state.go b/op-supervisor/supervisor/backend/db/logs/state.go index bb00762acc2e..df63f96e3599 100644 --- a/op-supervisor/supervisor/backend/db/logs/state.go +++ b/op-supervisor/supervisor/backend/db/logs/state.go @@ -9,6 +9,7 @@ import ( "github.com/ethereum-optimism/optimism/op-service/eth" "github.com/ethereum-optimism/optimism/op-supervisor/supervisor/backend/db/entrydb" + "github.com/ethereum-optimism/optimism/op-supervisor/supervisor/backend/db/heads" "github.com/ethereum-optimism/optimism/op-supervisor/supervisor/types" ) @@ -126,6 +127,18 @@ func (l *logContext) ExecMessage() *types.ExecutingMessage { return nil } +func (l *logContext) HeadPointer() (heads.HeadPointer, error) { + if l.need != 0 { + return heads.HeadPointer{}, errors.New("cannot provide head pointer while state is incomplete") + } + return heads.HeadPointer{ + LastSealedBlockHash: l.blockHash, + LastSealedBlockNum: l.blockNum, + LastSealedTimestamp: l.timestamp, + LogsSince: l.logsSince, + }, nil +} + // ApplyEntry applies an entry on top of the current state. func (l *logContext) ApplyEntry(entry entrydb.Entry) error { // Wrap processEntry to add common useful error message info diff --git a/op-supervisor/supervisor/backend/db/safety_checkers.go b/op-supervisor/supervisor/backend/db/safety_checkers.go deleted file mode 100644 index cbf4e3ddd6d7..000000000000 --- a/op-supervisor/supervisor/backend/db/safety_checkers.go +++ /dev/null @@ -1,153 +0,0 @@ -package db - -import ( - "fmt" - - "github.com/ethereum/go-ethereum/common" - - "github.com/ethereum-optimism/optimism/op-supervisor/supervisor/backend/db/heads" - "github.com/ethereum-optimism/optimism/op-supervisor/supervisor/backend/db/logs" - "github.com/ethereum-optimism/optimism/op-supervisor/supervisor/types" -) - -const ( - Unsafe = "unsafe" - Safe = "safe" - Finalized = "finalized" -) - -// SafetyChecker is an interface for checking the safety of a log entry -// it maintains a consistent view between local and cross chain for a given safety level -type SafetyChecker interface { - LocalHead(chainID types.ChainID) heads.HeadPointer - CrossHead(chainID types.ChainID) heads.HeadPointer - CheckLocal(chain types.ChainID, blockNum uint64, logIdx uint32, logHash common.Hash) error - CheckCross(chain types.ChainID, blockNum uint64, logIdx uint32, logHash common.Hash) error - UpdateLocal(chain types.ChainID, pointer heads.HeadPointer) error - UpdateCross(chain types.ChainID, pointer heads.HeadPointer) error - String() string - LocalSafetyLevel() types.SafetyLevel - CrossSafetyLevel() types.SafetyLevel -} - -// NewSafetyChecker creates a new SafetyChecker of the given type -func NewSafetyChecker(t types.SafetyLevel, chainsDB *ChainsDB) SafetyChecker { - return NewChecker(t, chainsDB) -} - -// check checks if the log entry is safe, provided a local head for the chain -// it is used by the individual SafetyCheckers to determine if a log entry is safe -func check( - chainsDB *ChainsDB, - head heads.HeadPointer, - chain types.ChainID, - blockNum uint64, - logIdx uint32, - logHash common.Hash) error { - - // for the Check to be valid, the log must: - // 1. have the expected logHash at the indicated blockNum and logIdx - _, err := chainsDB.logDBs[chain].Contains(blockNum, logIdx, logHash) - if err != nil { - return err - } - // 2. be within the range of the given head - if !head.WithinRange(blockNum, logIdx) { - return logs.ErrFuture - } - return nil -} - -// checker is a composition of accessor and update functions for a given safety level. -// they implement the SafetyChecker interface. -// checkers can be made with NewChecker. -type checker struct { - chains *ChainsDB - localSafety types.SafetyLevel - crossSafety types.SafetyLevel - updateCross func(chain types.ChainID, pointer heads.HeadPointer) error - updateLocal func(chain types.ChainID, pointer heads.HeadPointer) error - localHead func(chain types.ChainID) heads.HeadPointer - crossHead func(chain types.ChainID) heads.HeadPointer - checkCross func(chain types.ChainID, blockNum uint64, logIdx uint32, logHash common.Hash) error - checkLocal func(chain types.ChainID, blockNum uint64, logIdx uint32, logHash common.Hash) error -} - -func (c *checker) String() string { - return fmt.Sprintf("%s+%s", c.localSafety.String(), c.crossSafety.String()) -} - -func (c *checker) LocalSafetyLevel() types.SafetyLevel { - return c.localSafety -} - -func (c *checker) CrossSafetyLevel() types.SafetyLevel { - return c.crossSafety -} - -func (c *checker) UpdateCross(chain types.ChainID, pointer heads.HeadPointer) error { - return c.updateCross(chain, pointer) -} -func (c *checker) UpdateLocal(chain types.ChainID, pointer heads.HeadPointer) error { - return c.updateLocal(chain, pointer) -} -func (c *checker) LocalHead(chain types.ChainID) heads.HeadPointer { - return c.localHead(chain) -} -func (c *checker) CrossHead(chain types.ChainID) heads.HeadPointer { - return c.crossHead(chain) -} -func (c *checker) CheckCross(chain types.ChainID, blockNum uint64, logIdx uint32, logHash common.Hash) error { - return c.checkCross(chain, blockNum, logIdx, logHash) -} -func (c *checker) CheckLocal(chain types.ChainID, blockNum uint64, logIdx uint32, logHash common.Hash) error { - return c.checkLocal(chain, blockNum, logIdx, logHash) -} - -func NewChecker(t types.SafetyLevel, c *ChainsDB) SafetyChecker { - // checkWith creates a function which takes a chain-getter and returns a function that returns the head for the chain - checkWith := func(getHead func(chain types.ChainID) heads.HeadPointer) func(chain types.ChainID, blockNum uint64, logIdx uint32, logHash common.Hash) error { - return func(chain types.ChainID, blockNum uint64, logIdx uint32, logHash common.Hash) error { - return check(c, getHead(chain), chain, blockNum, logIdx, logHash) - } - } - switch t { - case Unsafe: - return &checker{ - chains: c, - localSafety: types.LocalUnsafe, - crossSafety: types.CrossUnsafe, - updateCross: c.heads.UpdateCrossUnsafe, - updateLocal: c.heads.UpdateLocalUnsafe, - crossHead: c.heads.CrossUnsafe, - localHead: c.heads.LocalUnsafe, - checkCross: checkWith(c.heads.CrossUnsafe), - checkLocal: checkWith(c.heads.LocalUnsafe), - } - case Safe: - return &checker{ - chains: c, - localSafety: types.LocalSafe, - crossSafety: types.CrossSafe, - updateCross: c.heads.UpdateCrossSafe, - updateLocal: c.heads.UpdateLocalSafe, - crossHead: c.heads.CrossSafe, - localHead: c.heads.LocalSafe, - checkCross: checkWith(c.heads.CrossSafe), - checkLocal: checkWith(c.heads.LocalSafe), - } - case Finalized: - return &checker{ - chains: c, - localSafety: types.Finalized, - crossSafety: types.Finalized, - updateCross: c.heads.UpdateCrossFinalized, - updateLocal: c.heads.UpdateLocalFinalized, - crossHead: c.heads.CrossFinalized, - localHead: c.heads.LocalFinalized, - checkCross: checkWith(c.heads.CrossFinalized), - checkLocal: checkWith(c.heads.LocalFinalized), - } - } - return &checker{} -} diff --git a/op-supervisor/supervisor/backend/db/safety_checkers_test.go b/op-supervisor/supervisor/backend/db/safety_checkers_test.go deleted file mode 100644 index fa0954bc6b65..000000000000 --- a/op-supervisor/supervisor/backend/db/safety_checkers_test.go +++ /dev/null @@ -1,215 +0,0 @@ -package db - -/* -import ( - "errors" - "testing" - - "github.com/stretchr/testify/require" - - "github.com/ethereum/go-ethereum/common" - "github.com/ethereum/go-ethereum/log" - - "github.com/ethereum-optimism/optimism/op-service/testlog" - "github.com/ethereum-optimism/optimism/op-supervisor/supervisor/backend/db/entrydb" - "github.com/ethereum-optimism/optimism/op-supervisor/supervisor/backend/db/heads" - "github.com/ethereum-optimism/optimism/op-supervisor/supervisor/backend/db/logs" - "github.com/ethereum-optimism/optimism/op-supervisor/supervisor/types" -) - -// TestHeadsForChain tests the heads for a chain, -// confirming the Unsafe, Safe and Finalized all return the correct head for the chain. -// and confirming that the chainID matters when finding the value -func TestHeadsForChain(t *testing.T) { - h := heads.NewHeads() - chainHeads := heads.ChainHeads{ - Unsafe: entrydb.EntryIdx(1), - CrossUnsafe: entrydb.EntryIdx(2), - LocalSafe: entrydb.EntryIdx(3), - CrossSafe: entrydb.EntryIdx(4), - LocalFinalized: entrydb.EntryIdx(5), - CrossFinalized: entrydb.EntryIdx(6), - } - h.Put(types.ChainIDFromUInt64(1), chainHeads) - chainsDB := NewChainsDB(nil, &stubHeadStorage{h}, testlog.Logger(t, log.LevelDebug)) - tcases := []struct { - name string - chainID types.ChainID - checkerType types.SafetyLevel - expectedLocal entrydb.EntryIdx - expectedCross entrydb.EntryIdx - }{ - { - "Unsafe Head", - types.ChainIDFromUInt64(1), - Unsafe, - entrydb.EntryIdx(1), - entrydb.EntryIdx(2), - }, - { - "Safe Head", - types.ChainIDFromUInt64(1), - Safe, - entrydb.EntryIdx(3), - entrydb.EntryIdx(4), - }, - { - "Finalized Head", - types.ChainIDFromUInt64(1), - Finalized, - entrydb.EntryIdx(5), - entrydb.EntryIdx(6), - }, - { - "Incorrect Chain", - types.ChainIDFromUInt64(100), - Safe, - entrydb.EntryIdx(0), - entrydb.EntryIdx(0), - }, - } - - for _, c := range tcases { - t.Run(c.name, func(t *testing.T) { - checker := NewSafetyChecker(c.checkerType, chainsDB) - localHead := checker.LocalHeadForChain(c.chainID) - crossHead := checker.CrossHeadForChain(c.chainID) - require.Equal(t, c.expectedLocal, localHead) - require.Equal(t, c.expectedCross, crossHead) - }) - } -} - -func TestCheck(t *testing.T) { - h := heads.NewHeads() - chainHeads := heads.ChainHeads{ - Unsafe: entrydb.EntryIdx(6), - CrossUnsafe: entrydb.EntryIdx(5), - LocalSafe: entrydb.EntryIdx(4), - CrossSafe: entrydb.EntryIdx(3), - LocalFinalized: entrydb.EntryIdx(2), - CrossFinalized: entrydb.EntryIdx(1), - } - h.Put(types.ChainIDFromUInt64(1), chainHeads) - - // the logStore contains just a single stubbed log DB - logDB := &stubLogDB{} - logsStore := map[types.ChainID]LogStorage{ - types.ChainIDFromUInt64(1): logDB, - } - - chainsDB := NewChainsDB(logsStore, &stubHeadStorage{h}, testlog.Logger(t, log.LevelDebug)) - - tcases := []struct { - name string - checkerType types.SafetyLevel - chainID types.ChainID - blockNum uint64 - logIdx uint32 - loghash common.Hash - containsResponse containsResponse - expected bool - }{ - { - // confirm that checking Unsafe uses the unsafe head, - // and that we can find logs even *at* the unsafe head index - "Unsafe Log at Head", - Unsafe, - types.ChainIDFromUInt64(1), - 1, - 1, - common.Hash{1, 2, 3}, - containsResponse{entrydb.EntryIdx(6), nil}, - true, - }, - { - // confirm that checking the Safe head works - "Safe Log", - Safe, - types.ChainIDFromUInt64(1), - 1, - 1, - common.Hash{1, 2, 3}, - containsResponse{entrydb.EntryIdx(3), nil}, - true, - }, - { - // confirm that checking the Finalized head works - "Finalized Log", - Finalized, - types.ChainIDFromUInt64(1), - 1, - 1, - common.Hash{1, 2, 3}, - containsResponse{entrydb.EntryIdx(1), nil}, - true, - }, - { - // confirm that when exists is false, we return false - "Does not Exist", - Safe, - types.ChainIDFromUInt64(1), - 1, - 1, - common.Hash{1, 2, 3}, - containsResponse{entrydb.EntryIdx(1), logs.ErrConflict}, - false, - }, - { - // confirm that when a head is out of range, we return false - "Unsafe Out of Range", - Unsafe, - types.ChainIDFromUInt64(1), - 1, - 1, - common.Hash{1, 2, 3}, - containsResponse{entrydb.EntryIdx(100), nil}, - false, - }, - { - // confirm that when a head is out of range, we return false - "Safe Out of Range", - Safe, - types.ChainIDFromUInt64(1), - 1, - 1, - common.Hash{1, 2, 3}, - containsResponse{entrydb.EntryIdx(5), nil}, - false, - }, - { - // confirm that when a head is out of range, we return false - "Finalized Out of Range", - Finalized, - types.ChainIDFromUInt64(1), - 1, - 1, - common.Hash{1, 2, 3}, - containsResponse{entrydb.EntryIdx(3), nil}, - false, - }, - { - // confirm that when Contains returns an error, we return false - "Error", - Safe, - types.ChainIDFromUInt64(1), - 1, - 1, - common.Hash{1, 2, 3}, - containsResponse{entrydb.EntryIdx(0), errors.New("error")}, - false, - }, - } - - for _, c := range tcases { - t.Run(c.name, func(t *testing.T) { - // rig the logStore to return the expected response - logDB.containsResponse = c.containsResponse - checker := NewSafetyChecker(c.checkerType, chainsDB) - r := checker.Check(c.chainID, c.blockNum, c.logIdx, c.loghash) - // confirm that the expected outcome is correct - require.Equal(t, c.expected, r) - }) - } -} -*/ diff --git a/op-supervisor/supervisor/backend/safety/safety.go b/op-supervisor/supervisor/backend/safety/safety.go new file mode 100644 index 000000000000..c7828336ba57 --- /dev/null +++ b/op-supervisor/supervisor/backend/safety/safety.go @@ -0,0 +1,270 @@ +package safety + +import ( + "fmt" + + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/log" + + "github.com/ethereum-optimism/optimism/op-service/eth" + "github.com/ethereum-optimism/optimism/op-supervisor/supervisor/backend/db/heads" + "github.com/ethereum-optimism/optimism/op-supervisor/supervisor/backend/db/logs" + "github.com/ethereum-optimism/optimism/op-supervisor/supervisor/types" +) + +type SafetyIndex interface { + // Updaters for the latest local safety status of each chain + UpdateLocalUnsafe(chainID types.ChainID, ref eth.L2BlockRef) error + UpdateLocalSafe(chainID types.ChainID, at eth.L1BlockRef, ref eth.L2BlockRef) error + UpdateFinalizeL1(ref eth.L1BlockRef) error + + // Getters for the latest safety status of each chain + UnsafeL2(chainID types.ChainID) (heads.HeadPointer, error) + CrossUnsafeL2(chainID types.ChainID) (heads.HeadPointer, error) + LocalSafeL2(chainID types.ChainID) (heads.HeadPointer, error) + CrossSafeL2(chainID types.ChainID) (heads.HeadPointer, error) + // We only finalize on full L2 block boundaries, hence not a heads.HeadPointer return. + FinalizedL2(chainId types.ChainID) (eth.BlockID, error) +} + +type ChainsDBClient interface { + IteratorStartingAt(chainID types.ChainID, sealedNum uint64, logIndex uint32) (logs.Iterator, error) + Check(chainID types.ChainID, blockNum uint64, logIdx uint32, logHash common.Hash) (h common.Hash, err error) +} + +type safetyIndex struct { + log log.Logger + + chains ChainsDBClient + + unsafe map[types.ChainID]*View + safe map[types.ChainID]*View + finalized map[types.ChainID]eth.BlockID + + // remember what each non-finalized L2 block is derived from + derivedFrom map[types.ChainID]map[common.Hash]eth.L1BlockRef + + // the last received L1 finality signal. + finalizedL1 eth.L1BlockRef +} + +func NewSafetyIndex(log log.Logger, chains ChainsDBClient) *safetyIndex { + return &safetyIndex{ + log: log, + chains: chains, + unsafe: make(map[types.ChainID]*View), + safe: make(map[types.ChainID]*View), + finalized: make(map[types.ChainID]eth.BlockID), + derivedFrom: make(map[types.ChainID]map[common.Hash]eth.L1BlockRef), + } +} + +// UpdateLocalUnsafe updates the local-unsafe view for the given chain, and advances the cross-unsafe status. +func (r *safetyIndex) UpdateLocalUnsafe(chainID types.ChainID, ref eth.L2BlockRef) error { + view, ok := r.safe[chainID] + if !ok { + iter, err := r.chains.IteratorStartingAt(chainID, ref.Number, 0) + if err != nil { + return fmt.Errorf("failed to open iterator for chain %s block %d", chainID, ref.Number) + } + view = &View{ + chainID: chainID, + iter: iter, + localView: heads.HeadPointer{ + LastSealedBlockHash: ref.Hash, + LastSealedBlockNum: ref.Number, + LastSealedTimestamp: ref.Time, + LogsSince: 0, + }, + localDerivedFrom: eth.L1BlockRef{}, + validWithinView: r.ValidWithinUnsafeView, + } + r.unsafe[chainID] = view + } else if err := view.UpdateLocal(eth.L1BlockRef{}, ref); err != nil { + return fmt.Errorf("failed to update local-unsafe: %w", err) + } + local, _ := r.unsafe[chainID].Local() + r.log.Debug("Updated local unsafe head", "chainID", chainID, "local", local) + r.advanceCrossUnsafe() + return nil +} + +// advanceCrossUnsafe calls Process on all cross-unsafe views. +func (r *safetyIndex) advanceCrossUnsafe() { + for chainID, view := range r.unsafe { + if err := view.Process(); err != nil { + r.log.Error("Failed to update cross-unsafe view", "chain", chainID, "err", err) + } + cross, _ := r.unsafe[chainID].Cross() + r.log.Debug("Updated cross unsafe head", "chainID", chainID, "cross", cross) + } +} + +// UpdateLocalSafe updates the local-safe view for the given chain, and advances the cross-safe status. +func (r *safetyIndex) UpdateLocalSafe( + chainID types.ChainID, at eth.L1BlockRef, ref eth.L2BlockRef) error { + view, ok := r.safe[chainID] + if !ok { + iter, err := r.chains.IteratorStartingAt(chainID, ref.Number, 0) + if err != nil { + return fmt.Errorf("failed to open iterator for chain %s block %d", chainID, ref.Number) + } + view = &View{ + chainID: chainID, + iter: iter, + localView: heads.HeadPointer{ + LastSealedBlockHash: ref.Hash, + LastSealedBlockNum: ref.Number, + LastSealedTimestamp: ref.Time, + LogsSince: 0, + }, + localDerivedFrom: at, + validWithinView: r.ValidWithinSafeView, + } + r.safe[chainID] = view + } else if err := view.UpdateLocal(at, ref); err != nil { + return fmt.Errorf("failed to update local-safe: %w", err) + } + + // register what this L2 block is derived from + m, ok := r.derivedFrom[chainID] + if !ok { + m = make(map[common.Hash]eth.L1BlockRef) + r.derivedFrom[chainID] = m + } + m[ref.Hash] = at + local, _ := r.safe[chainID].Local() + r.log.Debug("Updated local safe head", "chainID", chainID, "local", local) + r.advanceCrossSafe() + return nil +} + +// advanceCrossSafe calls Process on all cross-safe views, and advances the finalized safety status. +func (r *safetyIndex) advanceCrossSafe() { + for chainID, view := range r.safe { + if err := view.Process(); err != nil { + r.log.Error("Failed to update cross-safe view", "chain", chainID, "err", err) + } + cross, _ := r.safe[chainID].Cross() + r.log.Debug("Updated local safe head", "chainID", chainID, "cross", cross) + } + r.advanceFinalized() +} + +// UpdateFinalizeL1 updates the finalized L1 block, and advances the finalized safety status. +func (r *safetyIndex) UpdateFinalizeL1(ref eth.L1BlockRef) error { + if ref.Number <= r.finalizedL1.Number { + return fmt.Errorf("ignoring old L1 finality signal of %s, already have %s", ref, r.finalizedL1) + } + r.finalizedL1 = ref + r.log.Debug("Updated L1 finalized head", "L1finalized", ref) + r.advanceFinalized() + return nil +} + +// advanceFinalized should be called whenever the finalized L1 block, or the cross-safe history, changes. +// This then promotes the irreversible cross-safe L2 blocks to a finalized safety status. +func (r *safetyIndex) advanceFinalized() { + // Whatever was considered cross-safe at the finalized block-height can + // now be considered finalized, since the inputs have become irreversible. + for chainID, view := range r.safe { + crossSafe, err := view.Cross() + if err != nil { + r.log.Info("Failed to get cross-safe data, cannot finalize", "chain", chainID, "err", err) + continue + } + // TODO(#12184): we need to consider older cross-safe data, + // if we want to finalize something at all on longer lagging finality signal. + // Could consider just iterating over all derivedFrom contents? + l1Dep := r.derivedFrom[chainID][crossSafe.LastSealedBlockHash] + if l1Dep.Number < r.finalizedL1.Number { + r.finalized[chainID] = eth.BlockID{Hash: crossSafe.LastSealedBlockHash, Number: crossSafe.LastSealedBlockNum} + finalized := r.finalized[chainID] + r.log.Debug("Updated finalized head", "chainID", chainID, "finalized", finalized) + } + } +} + +// UnsafeL2 returns the latest unsafe L2 block of the given chain. +func (r *safetyIndex) UnsafeL2(chainID types.ChainID) (heads.HeadPointer, error) { + view, ok := r.unsafe[chainID] + if !ok { + return heads.HeadPointer{}, fmt.Errorf("no unsafe data for chain %s", chainID) + } + return view.Local() +} + +// CrossUnsafeL2 returns the latest cross-unsafe L2 block of the given chain. +func (r *safetyIndex) CrossUnsafeL2(chainID types.ChainID) (heads.HeadPointer, error) { + view, ok := r.unsafe[chainID] + if !ok { + return heads.HeadPointer{}, fmt.Errorf("no cross-unsafe data for chain %s", chainID) + } + return view.Cross() +} + +// LocalSafeL2 returns the latest local-safe L2 block of the given chain. +func (r *safetyIndex) LocalSafeL2(chainID types.ChainID) (heads.HeadPointer, error) { + view, ok := r.safe[chainID] + if !ok { + return heads.HeadPointer{}, fmt.Errorf("no local-safe data for chain %s", chainID) + } + return view.Local() +} + +// CrossSafeL2 returns the latest cross-safe L2 block of the given chain. +func (r *safetyIndex) CrossSafeL2(chainID types.ChainID) (heads.HeadPointer, error) { + view, ok := r.safe[chainID] + if !ok { + return heads.HeadPointer{}, fmt.Errorf("no cross-safe data for chain %s", chainID) + } + return view.Cross() +} + +// FinalizedL2 returns the latest finalized L2 block of the given chain. +func (r *safetyIndex) FinalizedL2(chainId types.ChainID) (eth.BlockID, error) { + finalized, ok := r.finalized[chainId] + if !ok { + return eth.BlockID{}, fmt.Errorf("not seen finalized data of chain %s at finalized L1 block %s", chainId, r.finalizedL1) + } + return finalized, nil +} + +// ValidWithinUnsafeView checks if the given executing message is in the database. +// unsafe view is meant to represent all of the database, and so no boundary checks are needed. +func (r *safetyIndex) ValidWithinUnsafeView(_ uint64, execMsg *types.ExecutingMessage) error { + execChainID := types.ChainIDFromUInt64(uint64(execMsg.Chain)) + _, err := r.chains.Check(execChainID, execMsg.BlockNum, execMsg.LogIdx, execMsg.Hash) + return err +} + +// ValidWithinSafeView checks if the given executing message is within the database, +// and within the L1 view of the caller. +func (r *safetyIndex) ValidWithinSafeView(l1View uint64, execMsg *types.ExecutingMessage) error { + execChainID := types.ChainIDFromUInt64(uint64(execMsg.Chain)) + + // Check that the initiating message, which was pulled in by the executing message, + // does indeed exist. And in which L2 block it exists (if any). + l2BlockHash, err := r.chains.Check(execChainID, execMsg.BlockNum, execMsg.LogIdx, execMsg.Hash) + if err != nil { + return err + } + // if the executing message falls within the execFinalized range, then nothing to check + execFinalized, ok := r.finalized[execChainID] + if ok && execFinalized.Number > execMsg.BlockNum { + return nil + } + // check if the L1 block of the executing message is known + execL1Block, ok := r.derivedFrom[execChainID][l2BlockHash] + if !ok { + return logs.ErrFuture // TODO(#12185) need to distinguish between same-data future, and new-data future + } + // check if the L1 block is within the view + if execL1Block.Number > l1View { + return fmt.Errorf("exec message depends on L2 block %s:%d, derived from L1 block %s, not within view yet: %w", + l2BlockHash, execMsg.BlockNum, execL1Block, logs.ErrFuture) + } + return nil +} + +var _ SafetyIndex = (*safetyIndex)(nil) diff --git a/op-supervisor/supervisor/backend/safety/views.go b/op-supervisor/supervisor/backend/safety/views.go new file mode 100644 index 000000000000..c9393758fad5 --- /dev/null +++ b/op-supervisor/supervisor/backend/safety/views.go @@ -0,0 +1,91 @@ +package safety + +import ( + "errors" + + "github.com/ethereum-optimism/optimism/op-service/eth" + "github.com/ethereum-optimism/optimism/op-supervisor/supervisor/backend/db/heads" + "github.com/ethereum-optimism/optimism/op-supervisor/supervisor/backend/db/logs" + "github.com/ethereum-optimism/optimism/op-supervisor/supervisor/types" +) + +type View struct { + chainID types.ChainID + + iter logs.Iterator + + localView heads.HeadPointer + localDerivedFrom eth.L1BlockRef + + validWithinView func(l1View uint64, execMsg *types.ExecutingMessage) error +} + +func (vi *View) Cross() (heads.HeadPointer, error) { + return vi.iter.HeadPointer() +} + +func (vi *View) Local() (heads.HeadPointer, error) { + if vi.localView == (heads.HeadPointer{}) { + return heads.HeadPointer{}, logs.ErrFuture + } + return vi.localView, nil +} + +func (vi *View) UpdateLocal(at eth.L1BlockRef, ref eth.L2BlockRef) error { + vi.localView = heads.HeadPointer{ + LastSealedBlockHash: ref.Hash, + LastSealedBlockNum: ref.Number, + //LastSealedTimestamp: ref.Time, + LogsSince: 0, + } + vi.localDerivedFrom = at + + // TODO(#11693): reorg check against existing DB + // TODO(#12186): localView may be larger than what DB contents we have + return nil +} + +func (vi *View) Process() error { + err := vi.iter.TraverseConditional(func(state logs.IteratorState) error { + hash, num, ok := state.SealedBlock() + if !ok { + return logs.ErrFuture // maybe a more specific error for no-genesis case? + } + // TODO(#11693): reorg check in the future. To make sure that what we traverse is still canonical. + _ = hash + // check if L2 block is within view + if !vi.localView.WithinRange(num, 0) { + return logs.ErrFuture + } + _, initLogIndex, ok := state.InitMessage() + if !ok { + return nil // no readable message, just an empty block + } + // check if the message is within view + if !vi.localView.WithinRange(num, initLogIndex) { + return logs.ErrFuture + } + // check if it is an executing message. If so, check the dependency + if execMsg := state.ExecMessage(); execMsg == nil { + // Check if executing message is within cross L2 view, + // relative to the L1 view of current message. + // And check if the message is valid to execute at all + // (i.e. if it exists on the initiating side). + // TODO(#12187): it's inaccurate to check with the view of the local-unsafe + // it should be limited to the L1 view at the time of the inclusion of execution of the message. + err := vi.validWithinView(vi.localDerivedFrom.Number, execMsg) + if err != nil { + return err + } + } + return nil + }) + if err == nil { + panic("expected reader to complete with an exit-error") + } + if errors.Is(err, logs.ErrFuture) { + // register the new cross-safe block as cross-safe up to the current L1 view + return nil + } + return err +} diff --git a/op-supervisor/supervisor/backend/source/chain.go b/op-supervisor/supervisor/backend/source/chain.go index 03286b1a4160..383a5fb74de8 100644 --- a/op-supervisor/supervisor/backend/source/chain.go +++ b/op-supervisor/supervisor/backend/source/chain.go @@ -10,7 +10,6 @@ import ( "github.com/ethereum-optimism/optimism/op-service/client" "github.com/ethereum-optimism/optimism/op-service/sources" "github.com/ethereum-optimism/optimism/op-service/sources/caching" - "github.com/ethereum-optimism/optimism/op-supervisor/supervisor/backend/db" "github.com/ethereum-optimism/optimism/op-supervisor/supervisor/types" ) @@ -25,8 +24,7 @@ type Metrics interface { } type Storage interface { - LogStorage - Heads() db.HeadsStorage + ChainsDBClientForLogProcessor DatabaseRewinder LatestBlockNum(chainID types.ChainID) (num uint64, ok bool) } @@ -50,16 +48,9 @@ func NewChainMonitor(ctx context.Context, logger log.Logger, m Metrics, chainID processLogs := newLogProcessor(chainID, store) unsafeBlockProcessor := NewChainProcessor(logger, cl, chainID, processLogs, store) - // create head processors which only update the head - unsafeHeadProcessor := OnNewHead(chainID, store.Heads().UpdateLocalUnsafe) - safeHeadProcessor := OnNewHead(chainID, store.Heads().UpdateLocalSafe) - finalizedHeadProcessor := OnNewHead(chainID, store.Heads().UpdateLocalFinalized) + unsafeProcessors := []HeadProcessor{unsafeBlockProcessor} - unsafeProcessors := []HeadProcessor{unsafeBlockProcessor, unsafeHeadProcessor} - safeProcessors := []HeadProcessor{safeHeadProcessor} - finalizedProcessors := []HeadProcessor{finalizedHeadProcessor} - - callback := newHeadUpdateProcessor(logger, unsafeProcessors, safeProcessors, finalizedProcessors) + callback := newHeadUpdateProcessor(logger, unsafeProcessors, nil, nil) headMonitor := NewHeadMonitor(logger, epochPollInterval, cl, callback) return &ChainMonitor{ diff --git a/op-supervisor/supervisor/backend/source/chain_processor.go b/op-supervisor/supervisor/backend/source/chain_processor.go index 4c7895b0cdf3..60568fe296fb 100644 --- a/op-supervisor/supervisor/backend/source/chain_processor.go +++ b/op-supervisor/supervisor/backend/source/chain_processor.go @@ -21,7 +21,7 @@ type Source interface { } type LogProcessor interface { - ProcessLogs(ctx context.Context, block eth.L1BlockRef, receipts gethtypes.Receipts) error + ProcessLogs(ctx context.Context, block eth.L2BlockRef, receipts gethtypes.Receipts) error } type DatabaseRewinder interface { @@ -130,7 +130,13 @@ func (s *ChainProcessor) worker() { func (s *ChainProcessor) update(nextNum uint64) error { ctx, cancel := context.WithTimeout(s.ctx, time.Second*10) - next, err := s.client.L1BlockRefByNumber(ctx, nextNum) + nextL1, err := s.client.L1BlockRefByNumber(ctx, nextNum) + next := eth.L2BlockRef{ + Hash: nextL1.Hash, + ParentHash: nextL1.ParentHash, + Number: nextL1.Number, + Time: nextL1.Time, + } cancel() if err != nil { return fmt.Errorf("failed to fetch next block: %w", err) diff --git a/op-supervisor/supervisor/backend/source/log_processor.go b/op-supervisor/supervisor/backend/source/log_processor.go index 1c20f8c4530a..8a815f7ca9e9 100644 --- a/op-supervisor/supervisor/backend/source/log_processor.go +++ b/op-supervisor/supervisor/backend/source/log_processor.go @@ -15,7 +15,12 @@ import ( ) type LogStorage interface { - SealBlock(chain types.ChainID, parentHash common.Hash, block eth.BlockID, timestamp uint64) error + SealBlock(chain types.ChainID, block eth.L2BlockRef) error + AddLog(chain types.ChainID, logHash common.Hash, parentBlock eth.BlockID, logIdx uint32, execMsg *types.ExecutingMessage) error +} + +type ChainsDBClientForLogProcessor interface { + SealBlock(chain types.ChainID, block eth.L2BlockRef) error AddLog(chain types.ChainID, logHash common.Hash, parentBlock eth.BlockID, logIdx uint32, execMsg *types.ExecutingMessage) error } @@ -39,7 +44,7 @@ func newLogProcessor(chain types.ChainID, logStore LogStorage) *logProcessor { // ProcessLogs processes logs from a block and stores them in the log storage // for any logs that are related to executing messages, they are decoded and stored -func (p *logProcessor) ProcessLogs(_ context.Context, block eth.L1BlockRef, rcpts ethTypes.Receipts) error { +func (p *logProcessor) ProcessLogs(_ context.Context, block eth.L2BlockRef, rcpts ethTypes.Receipts) error { for _, rcpt := range rcpts { for _, l := range rcpt.Logs { // log hash represents the hash of *this* log as a potentially initiating message @@ -60,7 +65,7 @@ func (p *logProcessor) ProcessLogs(_ context.Context, block eth.L1BlockRef, rcpt } } } - if err := p.logStore.SealBlock(p.chain, block.ParentHash, block.ID(), block.Time); err != nil { + if err := p.logStore.SealBlock(p.chain, block); err != nil { return fmt.Errorf("failed to seal block %s: %w", block.ID(), err) } return nil diff --git a/op-supervisor/supervisor/backend/source/log_processor_test.go b/op-supervisor/supervisor/backend/source/log_processor_test.go index bd7aa7abc3d1..6e96d731fcff 100644 --- a/op-supervisor/supervisor/backend/source/log_processor_test.go +++ b/op-supervisor/supervisor/backend/source/log_processor_test.go @@ -17,7 +17,7 @@ var logProcessorChainID = types.ChainIDFromUInt64(4) func TestLogProcessor(t *testing.T) { ctx := context.Background() - block1 := eth.L1BlockRef{ + block1 := eth.L2BlockRef{ ParentHash: common.Hash{0x42}, Number: 100, Hash: common.Hash{0x11}, @@ -205,14 +205,14 @@ type stubLogStorage struct { seals []storedSeal } -func (s *stubLogStorage) SealBlock(chainID types.ChainID, parentHash common.Hash, block eth.BlockID, timestamp uint64) error { +func (s *stubLogStorage) SealBlock(chainID types.ChainID, block eth.L2BlockRef) error { if logProcessorChainID != chainID { return fmt.Errorf("chain id mismatch, expected %v but got %v", logProcessorChainID, chainID) } s.seals = append(s.seals, storedSeal{ - parent: parentHash, - block: block, - timestamp: timestamp, + parent: block.ParentHash, + block: block.ID(), + timestamp: block.Time, }) return nil }