diff --git a/op-service/locks/rwmap.go b/op-service/locks/rwmap.go index 779e3554c824..2a6badf25525 100644 --- a/op-service/locks/rwmap.go +++ b/op-service/locks/rwmap.go @@ -46,3 +46,10 @@ func (m *RWMap[K, V]) Range(f func(key K, value V) bool) { } } } + +// Clear removes all key-value pairs from the map. +func (m *RWMap[K, V]) Clear() { + m.mu.Lock() + defer m.mu.Unlock() + clear(m.inner) +} diff --git a/op-supervisor/supervisor/backend/backend.go b/op-supervisor/supervisor/backend/backend.go index 37596c5d8b6b..7d368cb40f15 100644 --- a/op-supervisor/supervisor/backend/backend.go +++ b/op-supervisor/supervisor/backend/backend.go @@ -4,7 +4,6 @@ import ( "context" "errors" "fmt" - "sync" "sync/atomic" "time" @@ -14,6 +13,7 @@ import ( "github.com/ethereum-optimism/optimism/op-service/client" "github.com/ethereum-optimism/optimism/op-service/dial" "github.com/ethereum-optimism/optimism/op-service/eth" + "github.com/ethereum-optimism/optimism/op-service/locks" "github.com/ethereum-optimism/optimism/op-service/sources" "github.com/ethereum-optimism/optimism/op-supervisor/config" "github.com/ethereum-optimism/optimism/op-supervisor/supervisor/backend/cross" @@ -30,11 +30,6 @@ type SupervisorBackend struct { m Metrics dataDir string - // RW lock to avoid concurrent map mutations. - // Read = any chain may be used and mutated. - // Write = set of chains is changing. - mu sync.RWMutex - // depSet is the dependency set that the backend uses to know about the chains it is indexing depSet depset.DependencySet @@ -42,21 +37,21 @@ type SupervisorBackend struct { chainDBs *db.ChainsDB // chainProcessors are notified of new unsafe blocks, and add the unsafe log events data into the events DB - chainProcessors map[types.ChainID]*processors.ChainProcessor + chainProcessors locks.RWMap[types.ChainID, *processors.ChainProcessor] // crossSafeProcessors take local-safe data and promote it to cross-safe when verified - crossSafeProcessors map[types.ChainID]*cross.Worker + crossSafeProcessors locks.RWMap[types.ChainID, *cross.Worker] // crossUnsafeProcessors take local-unsafe data and promote it to cross-unsafe when verified - crossUnsafeProcessors map[types.ChainID]*cross.Worker + crossUnsafeProcessors locks.RWMap[types.ChainID, *cross.Worker] + + // chainMetrics are used to track metrics for each chain + // they are reused for processors and databases of the same chain + chainMetrics locks.RWMap[types.ChainID, *chainMetrics] // synchronousProcessors disables background-workers, // requiring manual triggers for the backend to process anything. synchronousProcessors bool - - // chainMetrics are used to track metrics for each chain - // they are reused for processors and databases of the same chain - chainMetrics map[types.ChainID]*chainMetrics } var _ frontend.Backend = (*SupervisorBackend)(nil) @@ -74,22 +69,17 @@ func NewSupervisorBackend(ctx context.Context, logger log.Logger, m Metrics, cfg if err != nil { return nil, fmt.Errorf("failed to load dependency set: %w", err) } - chains := depSet.Chains() // create initial per-chain resources chainsDBs := db.NewChainsDB(logger, depSet) // create the supervisor backend super := &SupervisorBackend{ - logger: logger, - m: m, - dataDir: cfg.Datadir, - depSet: depSet, - chainDBs: chainsDBs, - chainProcessors: make(map[types.ChainID]*processors.ChainProcessor, len(chains)), - chainMetrics: make(map[types.ChainID]*chainMetrics, len(chains)), - crossUnsafeProcessors: make(map[types.ChainID]*cross.Worker, len(chains)), - crossSafeProcessors: make(map[types.ChainID]*cross.Worker, len(chains)), + logger: logger, + m: m, + dataDir: cfg.Datadir, + depSet: depSet, + chainDBs: chainsDBs, // For testing we can avoid running the processors. synchronousProcessors: cfg.SynchronousProcessors, } @@ -120,19 +110,19 @@ func (su *SupervisorBackend) initResources(ctx context.Context, cfg *config.Conf // initialize all cross-unsafe processors for _, chainID := range chains { worker := cross.NewCrossUnsafeWorker(su.logger, chainID, su.chainDBs) - su.crossUnsafeProcessors[chainID] = worker + su.crossUnsafeProcessors.Set(chainID, worker) } // initialize all cross-safe processors for _, chainID := range chains { worker := cross.NewCrossSafeWorker(su.logger, chainID, su.chainDBs) - su.crossSafeProcessors[chainID] = worker + su.crossSafeProcessors.Set(chainID, worker) } // For each chain initialize a chain processor service, // after cross-unsafe workers are ready to receive updates for _, chainID := range chains { logProcessor := processors.NewLogProcessor(chainID, su.chainDBs) chainProcessor := processors.NewChainProcessor(su.logger, chainID, logProcessor, su.chainDBs, su.onIndexedLocalUnsafeData) - su.chainProcessors[chainID] = chainProcessor + su.chainProcessors.Set(chainID, chainProcessor) } // the config has some RPC connections to attach to the chain-processors @@ -148,31 +138,27 @@ func (su *SupervisorBackend) initResources(ctx context.Context, cfg *config.Conf // onIndexedLocalUnsafeData is called by the event indexing workers. // This signals to cross-unsafe workers that there's data to index. func (su *SupervisorBackend) onIndexedLocalUnsafeData() { - su.mu.RLock() - defer su.mu.RUnlock() - // We signal all workers, since dependencies on a chain may be unblocked // by new data on other chains. // Busy workers don't block processing. // The signal is picked up only if the worker is running in the background. - for _, w := range su.crossUnsafeProcessors { + su.crossUnsafeProcessors.Range(func(_ types.ChainID, w *cross.Worker) bool { w.OnNewData() - } + return true + }) } // onNewLocalSafeData is called by the safety-indexing. // This signals to cross-safe workers that there's data to index. func (su *SupervisorBackend) onNewLocalSafeData() { - su.mu.RLock() - defer su.mu.RUnlock() - // We signal all workers, since dependencies on a chain may be unblocked // by new data on other chains. // Busy workers don't block processing. // The signal is picked up only if the worker is running in the background. - for _, w := range su.crossSafeProcessors { + su.crossSafeProcessors.Range(func(_ types.ChainID, w *cross.Worker) bool { w.OnNewData() - } + return true + }) } // openChainDBs initializes all the DB resources of a specific chain. @@ -180,7 +166,7 @@ func (su *SupervisorBackend) onNewLocalSafeData() { func (su *SupervisorBackend) openChainDBs(chainID types.ChainID) error { cm := newChainMetrics(chainID, su.m) // create metrics and a logdb for the chain - su.chainMetrics[chainID] = cm + su.chainMetrics.Set(chainID, cm) logDB, err := db.OpenLogDB(su.logger, chainID, su.dataDir, cm) if err != nil { @@ -216,7 +202,7 @@ func (su *SupervisorBackend) attachRPC(ctx context.Context, rpc string) error { if !su.depSet.HasChain(chainID) { return fmt.Errorf("chain %s is not part of the interop dependency set: %w", chainID, types.ErrUnknownChain) } - cm, ok := su.chainMetrics[chainID] + cm, ok := su.chainMetrics.Get(chainID) if !ok { return fmt.Errorf("failed to find metrics for chain %v", chainID) } @@ -236,10 +222,7 @@ func (su *SupervisorBackend) attachRPC(ctx context.Context, rpc string) error { } func (su *SupervisorBackend) AttachProcessorSource(chainID types.ChainID, src processors.Source) error { - su.mu.RLock() - defer su.mu.RUnlock() - - proc, ok := su.chainProcessors[chainID] + proc, ok := su.chainProcessors.Get(chainID) if !ok { return fmt.Errorf("unknown chain %s, cannot attach RPC to processor", chainID) } @@ -260,9 +243,6 @@ func clientForL2(ctx context.Context, logger log.Logger, rpc string) (client.RPC } func (su *SupervisorBackend) Start(ctx context.Context) error { - su.mu.Lock() - defer su.mu.Unlock() - // ensure we only start once if !su.started.CompareAndSwap(false, true) { return errors.New("already started") @@ -276,46 +256,49 @@ func (su *SupervisorBackend) Start(ctx context.Context) error { if !su.synchronousProcessors { // Make all the chain-processors run automatic background processing - for _, processor := range su.chainProcessors { + su.chainProcessors.Range(func(_ types.ChainID, processor *processors.ChainProcessor) bool { processor.StartBackground() - } - for _, worker := range su.crossUnsafeProcessors { + return true + }) + su.crossUnsafeProcessors.Range(func(_ types.ChainID, worker *cross.Worker) bool { worker.StartBackground() - } - for _, worker := range su.crossSafeProcessors { + return true + }) + su.crossSafeProcessors.Range(func(_ types.ChainID, worker *cross.Worker) bool { worker.StartBackground() - } + return true + }) } return nil } func (su *SupervisorBackend) Stop(ctx context.Context) error { - su.mu.Lock() - defer su.mu.Unlock() - if !su.started.CompareAndSwap(true, false) { return errAlreadyStopped } su.logger.Info("Closing supervisor backend") // close all processors - for id, processor := range su.chainProcessors { + su.chainProcessors.Range(func(id types.ChainID, processor *processors.ChainProcessor) bool { su.logger.Info("stopping chain processor", "chainID", id) processor.Close() - } - clear(su.chainProcessors) + return true + }) + su.chainProcessors.Clear() - for id, worker := range su.crossUnsafeProcessors { + su.crossUnsafeProcessors.Range(func(id types.ChainID, worker *cross.Worker) bool { su.logger.Info("stopping cross-unsafe processor", "chainID", id) worker.Close() - } - clear(su.crossUnsafeProcessors) + return true + }) + su.crossUnsafeProcessors.Clear() - for id, worker := range su.crossSafeProcessors { + su.crossSafeProcessors.Range(func(id types.ChainID, worker *cross.Worker) bool { su.logger.Info("stopping cross-safe processor", "chainID", id) worker.Close() - } - clear(su.crossSafeProcessors) + return true + }) + su.crossSafeProcessors.Clear() // close the databases return su.chainDBs.Close() @@ -323,9 +306,6 @@ func (su *SupervisorBackend) Stop(ctx context.Context) error { // AddL2RPC attaches an RPC as the RPC for the given chain, overriding the previous RPC source, if any. func (su *SupervisorBackend) AddL2RPC(ctx context.Context, rpc string) error { - su.mu.RLock() // read-lock: we only modify an existing chain, we don't add/remove chains - defer su.mu.RUnlock() - return su.attachRPC(ctx, rpc) } @@ -340,9 +320,6 @@ func (su *SupervisorBackend) DependencySet() depset.DependencySet { // ---------------------------- func (su *SupervisorBackend) CheckMessage(identifier types.Identifier, payloadHash common.Hash) (types.SafetyLevel, error) { - su.mu.RLock() - defer su.mu.RUnlock() - logHash := types.PayloadHashToLogHash(payloadHash, identifier.Origin) chainID := identifier.ChainID blockNum := identifier.BlockNumber @@ -365,9 +342,6 @@ func (su *SupervisorBackend) CheckMessage(identifier types.Identifier, payloadHa func (su *SupervisorBackend) CheckMessages( messages []types.Message, minSafety types.SafetyLevel) error { - su.mu.RLock() - defer su.mu.RUnlock() - su.logger.Debug("Checking messages", "count", len(messages), "minSafety", minSafety) for _, msg := range messages { @@ -393,9 +367,6 @@ func (su *SupervisorBackend) CheckMessages( } func (su *SupervisorBackend) UnsafeView(ctx context.Context, chainID types.ChainID, unsafe types.ReferenceView) (types.ReferenceView, error) { - su.mu.RLock() - defer su.mu.RUnlock() - head, err := su.chainDBs.LocalUnsafe(chainID) if err != nil { return types.ReferenceView{}, fmt.Errorf("failed to get local-unsafe head: %w", err) @@ -414,9 +385,6 @@ func (su *SupervisorBackend) UnsafeView(ctx context.Context, chainID types.Chain } func (su *SupervisorBackend) SafeView(ctx context.Context, chainID types.ChainID, safe types.ReferenceView) (types.ReferenceView, error) { - su.mu.RLock() - defer su.mu.RUnlock() - _, localSafe, err := su.chainDBs.LocalSafe(chainID) if err != nil { return types.ReferenceView{}, fmt.Errorf("failed to get local-safe head: %w", err) @@ -435,9 +403,6 @@ func (su *SupervisorBackend) SafeView(ctx context.Context, chainID types.ChainID } func (su *SupervisorBackend) Finalized(ctx context.Context, chainID types.ChainID) (eth.BlockID, error) { - su.mu.RLock() - defer su.mu.RUnlock() - v, err := su.chainDBs.Finalized(chainID) if err != nil { return eth.BlockID{}, err @@ -446,9 +411,6 @@ func (su *SupervisorBackend) Finalized(ctx context.Context, chainID types.ChainI } func (su *SupervisorBackend) CrossDerivedFrom(ctx context.Context, chainID types.ChainID, derived eth.BlockID) (derivedFrom eth.BlockRef, err error) { - su.mu.RLock() - defer su.mu.RUnlock() - v, err := su.chainDBs.CrossDerivedFromBlockRef(chainID, derived) if err != nil { return eth.BlockRef{}, err @@ -460,9 +422,7 @@ func (su *SupervisorBackend) CrossDerivedFrom(ctx context.Context, chainID types // ---------------------------- func (su *SupervisorBackend) UpdateLocalUnsafe(ctx context.Context, chainID types.ChainID, head eth.BlockRef) error { - su.mu.RLock() - defer su.mu.RUnlock() - ch, ok := su.chainProcessors[chainID] + ch, ok := su.chainProcessors.Get(chainID) if !ok { return types.ErrUnknownChain } @@ -470,9 +430,6 @@ func (su *SupervisorBackend) UpdateLocalUnsafe(ctx context.Context, chainID type } func (su *SupervisorBackend) UpdateLocalSafe(ctx context.Context, chainID types.ChainID, derivedFrom eth.BlockRef, lastDerived eth.BlockRef) error { - su.mu.RLock() - defer su.mu.RUnlock() - err := su.chainDBs.UpdateLocalSafe(chainID, derivedFrom, lastDerived) if err != nil { return err @@ -482,9 +439,6 @@ func (su *SupervisorBackend) UpdateLocalSafe(ctx context.Context, chainID types. } func (su *SupervisorBackend) UpdateFinalizedL1(ctx context.Context, chainID types.ChainID, finalized eth.BlockRef) error { - su.mu.RLock() - defer su.mu.RUnlock() - return su.chainDBs.UpdateFinalizedL1(finalized) } @@ -492,9 +446,7 @@ func (su *SupervisorBackend) UpdateFinalizedL1(ctx context.Context, chainID type // ---------------------------- func (su *SupervisorBackend) SyncEvents(chainID types.ChainID) error { - su.mu.RLock() - defer su.mu.RUnlock() - ch, ok := su.chainProcessors[chainID] + ch, ok := su.chainProcessors.Get(chainID) if !ok { return types.ErrUnknownChain } @@ -503,9 +455,7 @@ func (su *SupervisorBackend) SyncEvents(chainID types.ChainID) error { } func (su *SupervisorBackend) SyncCrossUnsafe(chainID types.ChainID) error { - su.mu.RLock() - defer su.mu.RUnlock() - ch, ok := su.crossUnsafeProcessors[chainID] + ch, ok := su.crossUnsafeProcessors.Get(chainID) if !ok { return types.ErrUnknownChain } @@ -513,9 +463,7 @@ func (su *SupervisorBackend) SyncCrossUnsafe(chainID types.ChainID) error { } func (su *SupervisorBackend) SyncCrossSafe(chainID types.ChainID) error { - su.mu.RLock() - defer su.mu.RUnlock() - ch, ok := su.crossSafeProcessors[chainID] + ch, ok := su.crossSafeProcessors.Get(chainID) if !ok { return types.ErrUnknownChain } diff --git a/op-supervisor/supervisor/backend/backend_test.go b/op-supervisor/supervisor/backend/backend_test.go index 6ddfcac662b2..c104bf0bae5d 100644 --- a/op-supervisor/supervisor/backend/backend_test.go +++ b/op-supervisor/supervisor/backend/backend_test.go @@ -117,7 +117,8 @@ func TestBackendLifetime(t *testing.T) { require.NoError(t, err) // Make the processing happen, so we can rely on the new chain information, // and not run into errors for future data that isn't mocked at this time. - b.chainProcessors[chainA].ProcessToHead() + proc, _ := b.chainProcessors.Get(chainA) + proc.ProcessToHead() _, err = b.UnsafeView(context.Background(), chainA, types.ReferenceView{}) require.ErrorIs(t, err, types.ErrFuture, "still no data yet, need cross-unsafe")