Skip to content

Commit

Permalink
use RWMap for Supervisor Backend (ethereum-optimism#12785)
Browse files Browse the repository at this point in the history
* use RWMap for Supervisor Backend

* adjust Clear method
  • Loading branch information
axelKingsley authored Nov 1, 2024
1 parent 39dc079 commit 1ebbd29
Show file tree
Hide file tree
Showing 3 changed files with 59 additions and 103 deletions.
7 changes: 7 additions & 0 deletions op-service/locks/rwmap.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,3 +46,10 @@ func (m *RWMap[K, V]) Range(f func(key K, value V) bool) {
}
}
}

// Clear removes all key-value pairs from the map.
func (m *RWMap[K, V]) Clear() {
m.mu.Lock()
defer m.mu.Unlock()
clear(m.inner)
}
152 changes: 50 additions & 102 deletions op-supervisor/supervisor/backend/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"context"
"errors"
"fmt"
"sync"
"sync/atomic"
"time"

Expand All @@ -14,6 +13,7 @@ import (
"github.com/ethereum-optimism/optimism/op-service/client"
"github.com/ethereum-optimism/optimism/op-service/dial"
"github.com/ethereum-optimism/optimism/op-service/eth"
"github.com/ethereum-optimism/optimism/op-service/locks"
"github.com/ethereum-optimism/optimism/op-service/sources"
"github.com/ethereum-optimism/optimism/op-supervisor/config"
"github.com/ethereum-optimism/optimism/op-supervisor/supervisor/backend/cross"
Expand All @@ -30,33 +30,28 @@ type SupervisorBackend struct {
m Metrics
dataDir string

// RW lock to avoid concurrent map mutations.
// Read = any chain may be used and mutated.
// Write = set of chains is changing.
mu sync.RWMutex

// depSet is the dependency set that the backend uses to know about the chains it is indexing
depSet depset.DependencySet

// chainDBs holds on to the DB indices for each chain
chainDBs *db.ChainsDB

// chainProcessors are notified of new unsafe blocks, and add the unsafe log events data into the events DB
chainProcessors map[types.ChainID]*processors.ChainProcessor
chainProcessors locks.RWMap[types.ChainID, *processors.ChainProcessor]

// crossSafeProcessors take local-safe data and promote it to cross-safe when verified
crossSafeProcessors map[types.ChainID]*cross.Worker
crossSafeProcessors locks.RWMap[types.ChainID, *cross.Worker]

// crossUnsafeProcessors take local-unsafe data and promote it to cross-unsafe when verified
crossUnsafeProcessors map[types.ChainID]*cross.Worker
crossUnsafeProcessors locks.RWMap[types.ChainID, *cross.Worker]

// chainMetrics are used to track metrics for each chain
// they are reused for processors and databases of the same chain
chainMetrics locks.RWMap[types.ChainID, *chainMetrics]

// synchronousProcessors disables background-workers,
// requiring manual triggers for the backend to process anything.
synchronousProcessors bool

// chainMetrics are used to track metrics for each chain
// they are reused for processors and databases of the same chain
chainMetrics map[types.ChainID]*chainMetrics
}

var _ frontend.Backend = (*SupervisorBackend)(nil)
Expand All @@ -74,22 +69,17 @@ func NewSupervisorBackend(ctx context.Context, logger log.Logger, m Metrics, cfg
if err != nil {
return nil, fmt.Errorf("failed to load dependency set: %w", err)
}
chains := depSet.Chains()

// create initial per-chain resources
chainsDBs := db.NewChainsDB(logger, depSet)

// create the supervisor backend
super := &SupervisorBackend{
logger: logger,
m: m,
dataDir: cfg.Datadir,
depSet: depSet,
chainDBs: chainsDBs,
chainProcessors: make(map[types.ChainID]*processors.ChainProcessor, len(chains)),
chainMetrics: make(map[types.ChainID]*chainMetrics, len(chains)),
crossUnsafeProcessors: make(map[types.ChainID]*cross.Worker, len(chains)),
crossSafeProcessors: make(map[types.ChainID]*cross.Worker, len(chains)),
logger: logger,
m: m,
dataDir: cfg.Datadir,
depSet: depSet,
chainDBs: chainsDBs,
// For testing we can avoid running the processors.
synchronousProcessors: cfg.SynchronousProcessors,
}
Expand Down Expand Up @@ -120,19 +110,19 @@ func (su *SupervisorBackend) initResources(ctx context.Context, cfg *config.Conf
// initialize all cross-unsafe processors
for _, chainID := range chains {
worker := cross.NewCrossUnsafeWorker(su.logger, chainID, su.chainDBs)
su.crossUnsafeProcessors[chainID] = worker
su.crossUnsafeProcessors.Set(chainID, worker)
}
// initialize all cross-safe processors
for _, chainID := range chains {
worker := cross.NewCrossSafeWorker(su.logger, chainID, su.chainDBs)
su.crossSafeProcessors[chainID] = worker
su.crossSafeProcessors.Set(chainID, worker)
}
// For each chain initialize a chain processor service,
// after cross-unsafe workers are ready to receive updates
for _, chainID := range chains {
logProcessor := processors.NewLogProcessor(chainID, su.chainDBs)
chainProcessor := processors.NewChainProcessor(su.logger, chainID, logProcessor, su.chainDBs, su.onIndexedLocalUnsafeData)
su.chainProcessors[chainID] = chainProcessor
su.chainProcessors.Set(chainID, chainProcessor)
}

// the config has some RPC connections to attach to the chain-processors
Expand All @@ -148,39 +138,35 @@ func (su *SupervisorBackend) initResources(ctx context.Context, cfg *config.Conf
// onIndexedLocalUnsafeData is called by the event indexing workers.
// This signals to cross-unsafe workers that there's data to index.
func (su *SupervisorBackend) onIndexedLocalUnsafeData() {
su.mu.RLock()
defer su.mu.RUnlock()

// We signal all workers, since dependencies on a chain may be unblocked
// by new data on other chains.
// Busy workers don't block processing.
// The signal is picked up only if the worker is running in the background.
for _, w := range su.crossUnsafeProcessors {
su.crossUnsafeProcessors.Range(func(_ types.ChainID, w *cross.Worker) bool {
w.OnNewData()
}
return true
})
}

// onNewLocalSafeData is called by the safety-indexing.
// This signals to cross-safe workers that there's data to index.
func (su *SupervisorBackend) onNewLocalSafeData() {
su.mu.RLock()
defer su.mu.RUnlock()

// We signal all workers, since dependencies on a chain may be unblocked
// by new data on other chains.
// Busy workers don't block processing.
// The signal is picked up only if the worker is running in the background.
for _, w := range su.crossSafeProcessors {
su.crossSafeProcessors.Range(func(_ types.ChainID, w *cross.Worker) bool {
w.OnNewData()
}
return true
})
}

// openChainDBs initializes all the DB resources of a specific chain.
// It is a sub-task of initResources.
func (su *SupervisorBackend) openChainDBs(chainID types.ChainID) error {
cm := newChainMetrics(chainID, su.m)
// create metrics and a logdb for the chain
su.chainMetrics[chainID] = cm
su.chainMetrics.Set(chainID, cm)

logDB, err := db.OpenLogDB(su.logger, chainID, su.dataDir, cm)
if err != nil {
Expand Down Expand Up @@ -216,7 +202,7 @@ func (su *SupervisorBackend) attachRPC(ctx context.Context, rpc string) error {
if !su.depSet.HasChain(chainID) {
return fmt.Errorf("chain %s is not part of the interop dependency set: %w", chainID, types.ErrUnknownChain)
}
cm, ok := su.chainMetrics[chainID]
cm, ok := su.chainMetrics.Get(chainID)
if !ok {
return fmt.Errorf("failed to find metrics for chain %v", chainID)
}
Expand All @@ -236,10 +222,7 @@ func (su *SupervisorBackend) attachRPC(ctx context.Context, rpc string) error {
}

func (su *SupervisorBackend) AttachProcessorSource(chainID types.ChainID, src processors.Source) error {
su.mu.RLock()
defer su.mu.RUnlock()

proc, ok := su.chainProcessors[chainID]
proc, ok := su.chainProcessors.Get(chainID)
if !ok {
return fmt.Errorf("unknown chain %s, cannot attach RPC to processor", chainID)
}
Expand All @@ -260,9 +243,6 @@ func clientForL2(ctx context.Context, logger log.Logger, rpc string) (client.RPC
}

func (su *SupervisorBackend) Start(ctx context.Context) error {
su.mu.Lock()
defer su.mu.Unlock()

// ensure we only start once
if !su.started.CompareAndSwap(false, true) {
return errors.New("already started")
Expand All @@ -276,56 +256,56 @@ func (su *SupervisorBackend) Start(ctx context.Context) error {

if !su.synchronousProcessors {
// Make all the chain-processors run automatic background processing
for _, processor := range su.chainProcessors {
su.chainProcessors.Range(func(_ types.ChainID, processor *processors.ChainProcessor) bool {
processor.StartBackground()
}
for _, worker := range su.crossUnsafeProcessors {
return true
})
su.crossUnsafeProcessors.Range(func(_ types.ChainID, worker *cross.Worker) bool {
worker.StartBackground()
}
for _, worker := range su.crossSafeProcessors {
return true
})
su.crossSafeProcessors.Range(func(_ types.ChainID, worker *cross.Worker) bool {
worker.StartBackground()
}
return true
})
}

return nil
}

func (su *SupervisorBackend) Stop(ctx context.Context) error {
su.mu.Lock()
defer su.mu.Unlock()

if !su.started.CompareAndSwap(true, false) {
return errAlreadyStopped
}
su.logger.Info("Closing supervisor backend")
// close all processors
for id, processor := range su.chainProcessors {
su.chainProcessors.Range(func(id types.ChainID, processor *processors.ChainProcessor) bool {
su.logger.Info("stopping chain processor", "chainID", id)
processor.Close()
}
clear(su.chainProcessors)
return true
})
su.chainProcessors.Clear()

for id, worker := range su.crossUnsafeProcessors {
su.crossUnsafeProcessors.Range(func(id types.ChainID, worker *cross.Worker) bool {
su.logger.Info("stopping cross-unsafe processor", "chainID", id)
worker.Close()
}
clear(su.crossUnsafeProcessors)
return true
})
su.crossUnsafeProcessors.Clear()

for id, worker := range su.crossSafeProcessors {
su.crossSafeProcessors.Range(func(id types.ChainID, worker *cross.Worker) bool {
su.logger.Info("stopping cross-safe processor", "chainID", id)
worker.Close()
}
clear(su.crossSafeProcessors)
return true
})
su.crossSafeProcessors.Clear()

// close the databases
return su.chainDBs.Close()
}

// AddL2RPC attaches an RPC as the RPC for the given chain, overriding the previous RPC source, if any.
func (su *SupervisorBackend) AddL2RPC(ctx context.Context, rpc string) error {
su.mu.RLock() // read-lock: we only modify an existing chain, we don't add/remove chains
defer su.mu.RUnlock()

return su.attachRPC(ctx, rpc)
}

Expand All @@ -340,9 +320,6 @@ func (su *SupervisorBackend) DependencySet() depset.DependencySet {
// ----------------------------

func (su *SupervisorBackend) CheckMessage(identifier types.Identifier, payloadHash common.Hash) (types.SafetyLevel, error) {
su.mu.RLock()
defer su.mu.RUnlock()

logHash := types.PayloadHashToLogHash(payloadHash, identifier.Origin)
chainID := identifier.ChainID
blockNum := identifier.BlockNumber
Expand All @@ -365,9 +342,6 @@ func (su *SupervisorBackend) CheckMessage(identifier types.Identifier, payloadHa
func (su *SupervisorBackend) CheckMessages(
messages []types.Message,
minSafety types.SafetyLevel) error {
su.mu.RLock()
defer su.mu.RUnlock()

su.logger.Debug("Checking messages", "count", len(messages), "minSafety", minSafety)

for _, msg := range messages {
Expand All @@ -393,9 +367,6 @@ func (su *SupervisorBackend) CheckMessages(
}

func (su *SupervisorBackend) UnsafeView(ctx context.Context, chainID types.ChainID, unsafe types.ReferenceView) (types.ReferenceView, error) {
su.mu.RLock()
defer su.mu.RUnlock()

head, err := su.chainDBs.LocalUnsafe(chainID)
if err != nil {
return types.ReferenceView{}, fmt.Errorf("failed to get local-unsafe head: %w", err)
Expand All @@ -414,9 +385,6 @@ func (su *SupervisorBackend) UnsafeView(ctx context.Context, chainID types.Chain
}

func (su *SupervisorBackend) SafeView(ctx context.Context, chainID types.ChainID, safe types.ReferenceView) (types.ReferenceView, error) {
su.mu.RLock()
defer su.mu.RUnlock()

_, localSafe, err := su.chainDBs.LocalSafe(chainID)
if err != nil {
return types.ReferenceView{}, fmt.Errorf("failed to get local-safe head: %w", err)
Expand All @@ -435,9 +403,6 @@ func (su *SupervisorBackend) SafeView(ctx context.Context, chainID types.ChainID
}

func (su *SupervisorBackend) Finalized(ctx context.Context, chainID types.ChainID) (eth.BlockID, error) {
su.mu.RLock()
defer su.mu.RUnlock()

v, err := su.chainDBs.Finalized(chainID)
if err != nil {
return eth.BlockID{}, err
Expand All @@ -446,9 +411,6 @@ func (su *SupervisorBackend) Finalized(ctx context.Context, chainID types.ChainI
}

func (su *SupervisorBackend) CrossDerivedFrom(ctx context.Context, chainID types.ChainID, derived eth.BlockID) (derivedFrom eth.BlockRef, err error) {
su.mu.RLock()
defer su.mu.RUnlock()

v, err := su.chainDBs.CrossDerivedFromBlockRef(chainID, derived)
if err != nil {
return eth.BlockRef{}, err
Expand All @@ -460,19 +422,14 @@ func (su *SupervisorBackend) CrossDerivedFrom(ctx context.Context, chainID types
// ----------------------------

func (su *SupervisorBackend) UpdateLocalUnsafe(ctx context.Context, chainID types.ChainID, head eth.BlockRef) error {
su.mu.RLock()
defer su.mu.RUnlock()
ch, ok := su.chainProcessors[chainID]
ch, ok := su.chainProcessors.Get(chainID)
if !ok {
return types.ErrUnknownChain
}
return ch.OnNewHead(head)
}

func (su *SupervisorBackend) UpdateLocalSafe(ctx context.Context, chainID types.ChainID, derivedFrom eth.BlockRef, lastDerived eth.BlockRef) error {
su.mu.RLock()
defer su.mu.RUnlock()

err := su.chainDBs.UpdateLocalSafe(chainID, derivedFrom, lastDerived)
if err != nil {
return err
Expand All @@ -482,19 +439,14 @@ func (su *SupervisorBackend) UpdateLocalSafe(ctx context.Context, chainID types.
}

func (su *SupervisorBackend) UpdateFinalizedL1(ctx context.Context, chainID types.ChainID, finalized eth.BlockRef) error {
su.mu.RLock()
defer su.mu.RUnlock()

return su.chainDBs.UpdateFinalizedL1(finalized)
}

// Access to synchronous processing for tests
// ----------------------------

func (su *SupervisorBackend) SyncEvents(chainID types.ChainID) error {
su.mu.RLock()
defer su.mu.RUnlock()
ch, ok := su.chainProcessors[chainID]
ch, ok := su.chainProcessors.Get(chainID)
if !ok {
return types.ErrUnknownChain
}
Expand All @@ -503,19 +455,15 @@ func (su *SupervisorBackend) SyncEvents(chainID types.ChainID) error {
}

func (su *SupervisorBackend) SyncCrossUnsafe(chainID types.ChainID) error {
su.mu.RLock()
defer su.mu.RUnlock()
ch, ok := su.crossUnsafeProcessors[chainID]
ch, ok := su.crossUnsafeProcessors.Get(chainID)
if !ok {
return types.ErrUnknownChain
}
return ch.ProcessWork()
}

func (su *SupervisorBackend) SyncCrossSafe(chainID types.ChainID) error {
su.mu.RLock()
defer su.mu.RUnlock()
ch, ok := su.crossSafeProcessors[chainID]
ch, ok := su.crossSafeProcessors.Get(chainID)
if !ok {
return types.ErrUnknownChain
}
Expand Down
Loading

0 comments on commit 1ebbd29

Please sign in to comment.