From 55ebf7bcdb396c7f1d506f9c1adf080f5c8e1474 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?P=C3=A9ter=20Szil=C3=A1gyi?= Date: Mon, 29 Apr 2024 16:31:48 +0300 Subject: [PATCH 1/4] core/state, internal/workerpool: parallelize parts of state commit --- core/state/state_object.go | 3 + core/state/statedb.go | 84 +++++++++++++++++++-------- internal/workerpool/workerpool.go | 95 +++++++++++++++++++++++++++++++ 3 files changed, 157 insertions(+), 25 deletions(-) create mode 100644 internal/workerpool/workerpool.go diff --git a/core/state/state_object.go b/core/state/state_object.go index 1454f7a459a5..d75ba01376bd 100644 --- a/core/state/state_object.go +++ b/core/state/state_object.go @@ -403,6 +403,9 @@ func (s *stateObject) updateRoot() { // commit obtains a set of dirty storage trie nodes and updates the account data. // The returned set can be nil if nothing to commit. This function assumes all // storage mutations have already been flushed into trie by updateRoot. +// +// Note, commit may run concurrently across all the state objects. Do not assume +// thread-safe access to the statedb. func (s *stateObject) commit() (*trienode.NodeSet, error) { // Short circuit if trie is not even loaded, don't bother with committing anything if s.trie == nil { diff --git a/core/state/statedb.go b/core/state/statedb.go index 6d9cc907e03f..30a134e1cd6f 100644 --- a/core/state/statedb.go +++ b/core/state/statedb.go @@ -21,8 +21,10 @@ import ( "fmt" "maps" "math/big" + "runtime" "slices" "sort" + "sync" "time" "github.com/ethereum/go-ethereum/common" @@ -31,6 +33,7 @@ import ( "github.com/ethereum/go-ethereum/core/tracing" "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/crypto" + "github.com/ethereum/go-ethereum/internal/workerpool" "github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/params" "github.com/ethereum/go-ethereum/trie" @@ -1146,47 +1149,78 @@ func (s *StateDB) Commit(block uint64, deleteEmptyObjects bool) (common.Hash, er storageTrieNodesUpdated int storageTrieNodesDeleted int nodes = trienode.NewMergedNodeSet() - codeWriter = s.db.DiskDB().NewBatch() ) // Handle all state deletions first if err := s.handleDestruction(nodes); err != nil { return common.Hash{}, err } - // Handle all state updates afterwards + // Handle all state updates afterwards, concurrently to one another to shave + // off some milliseconds from the commit operation. Also accumulate the code + // writes to run in parallel with the computations. start := time.Now() + var ( + code = s.db.DiskDB().NewBatch() + lock sync.Mutex + ) + workers := workerpool.New[*stateObject, error](len(s.mutations), min(len(s.mutations), runtime.NumCPU()), + func(obj *stateObject) error { + // Write any storage changes in the state object to its storage trie + set, err := obj.commit() + if err != nil { + return err + } + // Merge the dirty nodes of storage trie into global set. It is possible + // that the account was destructed and then resurrected in the same block. + // In this case, the node set is shared by both accounts. + if set != nil { + lock.Lock() + defer lock.Unlock() + + if err = nodes.Merge(set); err != nil { + return err + } + updates, deleted := set.Size() + storageTrieNodesUpdated += updates + storageTrieNodesDeleted += deleted + } + return nil + }) + for addr, op := range s.mutations { if op.isDelete() { continue } - obj := s.stateObjects[addr] - // Write any contract code associated with the state object + obj := s.stateObjects[addr] if obj.code != nil && obj.dirtyCode { - rawdb.WriteCode(codeWriter, common.BytesToHash(obj.CodeHash()), obj.code) + rawdb.WriteCode(code, common.BytesToHash(obj.CodeHash()), obj.code) obj.dirtyCode = false } - // Write any storage changes in the state object to its storage trie - set, err := obj.commit() - if err != nil { - return common.Hash{}, err - } - // Merge the dirty nodes of storage trie into global set. It is possible - // that the account was destructed and then resurrected in the same block. - // In this case, the node set is shared by both accounts. - if set != nil { - if err := nodes.Merge(set); err != nil { - return common.Hash{}, err - } - updates, deleted := set.Size() - storageTrieNodesUpdated += updates - storageTrieNodesDeleted += deleted + // Run the storage updates concurrently to one another + workers.Schedule(obj) + } + workers.Close() + + // Updates running concurrently, wait for them to complete; running the code + // writes in the meantime. + done := make(chan struct{}) + go func() { + // This goroutine is only needed to accurately measure the storage commit + // and not have the concurrent code write dirty the stats. + defer close(done) + + workers.Wait() + s.StorageCommits += time.Since(start) + }() + if code.ValueSize() > 0 { + if err := code.Write(); err != nil { + log.Crit("Failed to commit dirty codes", "error", err) } } - s.StorageCommits += time.Since(start) - - if codeWriter.ValueSize() > 0 { - if err := codeWriter.Write(); err != nil { - log.Crit("Failed to commit dirty codes", "error", err) + <-done + for err := range workers.Results() { + if err != nil { + return common.Hash{}, err } } // Write the account trie changes, measuring the amount of wasted time diff --git a/internal/workerpool/workerpool.go b/internal/workerpool/workerpool.go new file mode 100644 index 000000000000..431265a79f7a --- /dev/null +++ b/internal/workerpool/workerpool.go @@ -0,0 +1,95 @@ +// Copyright 2024 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see . + +// Package workerpool implements a concurrent task processor. +package workerpool + +import ( + "runtime" + "sync" +) + +// WorkerPool is a concurrent task processor, scheduling and running tasks from +// a source channel, feeding any errors into a sink. +type WorkerPool[T any, R any] struct { + tasks chan T // Input channel waiting to consume tasks + results chan R // Result channel for consuers to wait on + working sync.WaitGroup // Waitgroup blocking on worker liveness +} + +// New creates a worker pool with the given number of max task capacity and an +// optional goroutine count to execute on. If 0 threads are requested, the pool +// will default to the number of (logical) CPUs. +func New[T any, R any](tasks int, threads int, f func(T) R) *WorkerPool[T, R] { + // Create the worker pool + pool := &WorkerPool[T, R]{ + tasks: make(chan T, tasks), + results: make(chan R, tasks), + } + // Start all the data processor routines + if threads == 0 { + threads = runtime.NumCPU() + } + pool.working.Add(threads) + for i := 0; i < threads; i++ { + go pool.work(f) + } + return pool +} + +// Close signals the end of the task stream. It does not block execution, rather +// returns immediately and users have to explicitly call Wait to block until the +// pool actually spins down. Alternatively, consumers can read the results chan, +// which will be closed after the last result is delivered. +// +// Calling Close multiple times will panic. Not particularly hard to avoid, but +// it's really a programming error. +func (pool *WorkerPool[T, R]) Close() { + close(pool.tasks) + go func() { + pool.working.Wait() + close(pool.results) + }() +} + +// Wait blocks until all the scheduled tasks have been processed. +func (pool *WorkerPool[T, R]) Wait() { + pool.working.Wait() +} + +// Schedule adds a task to the work queue. +func (pool *WorkerPool[T, R]) Schedule(task T) { + pool.tasks <- task +} + +// Results retrieves the result channel to consume the output of the individual +// work tasks. The channel will be closed after all tasks are done. +// +// Note, as long as the number of actually scheduled tasks are smaller or equal +// to the requested number form the constructor, it's fine to not consume this +// channel. +func (pool *WorkerPool[T, R]) Results() chan R { + return pool.results +} + +// work is the (one of many) goroutine consuming input tasks and executing them +// to compute the results. +func (pool *WorkerPool[T, R]) work(f func(T) R) { + defer pool.working.Done() + for task := range pool.tasks { + pool.results <- f(task) + } +} From 385c7fb9fc762f98a64f4d879113ddeff7eb4de5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?P=C3=A9ter=20Szil=C3=A1gyi?= Date: Mon, 29 Apr 2024 16:40:44 +0300 Subject: [PATCH 2/4] core, internal: move workerpool into syncx --- core/state/statedb.go | 4 ++-- internal/{workerpool => syncx}/workerpool.go | 11 +++++------ 2 files changed, 7 insertions(+), 8 deletions(-) rename internal/{workerpool => syncx}/workerpool.go (88%) diff --git a/core/state/statedb.go b/core/state/statedb.go index 30a134e1cd6f..f173da0b92db 100644 --- a/core/state/statedb.go +++ b/core/state/statedb.go @@ -33,7 +33,7 @@ import ( "github.com/ethereum/go-ethereum/core/tracing" "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/crypto" - "github.com/ethereum/go-ethereum/internal/workerpool" + "github.com/ethereum/go-ethereum/internal/syncx" "github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/params" "github.com/ethereum/go-ethereum/trie" @@ -1162,7 +1162,7 @@ func (s *StateDB) Commit(block uint64, deleteEmptyObjects bool) (common.Hash, er code = s.db.DiskDB().NewBatch() lock sync.Mutex ) - workers := workerpool.New[*stateObject, error](len(s.mutations), min(len(s.mutations), runtime.NumCPU()), + workers := syncx.NewWorkerPool[*stateObject, error](len(s.mutations), min(len(s.mutations), runtime.NumCPU()), func(obj *stateObject) error { // Write any storage changes in the state object to its storage trie set, err := obj.commit() diff --git a/internal/workerpool/workerpool.go b/internal/syncx/workerpool.go similarity index 88% rename from internal/workerpool/workerpool.go rename to internal/syncx/workerpool.go index 431265a79f7a..f207ac669e6a 100644 --- a/internal/workerpool/workerpool.go +++ b/internal/syncx/workerpool.go @@ -14,8 +14,7 @@ // You should have received a copy of the GNU Lesser General Public License // along with the go-ethereum library. If not, see . -// Package workerpool implements a concurrent task processor. -package workerpool +package syncx import ( "runtime" @@ -30,10 +29,10 @@ type WorkerPool[T any, R any] struct { working sync.WaitGroup // Waitgroup blocking on worker liveness } -// New creates a worker pool with the given number of max task capacity and an -// optional goroutine count to execute on. If 0 threads are requested, the pool -// will default to the number of (logical) CPUs. -func New[T any, R any](tasks int, threads int, f func(T) R) *WorkerPool[T, R] { +// NewWorkerPool creates a worker pool with the given number of max task capacity +// and an optional goroutine count to execute on. If 0 threads are requested, the +// pool will default to the number of (logical) CPUs. +func NewWorkerPool[T any, R any](tasks int, threads int, f func(T) R) *WorkerPool[T, R] { // Create the worker pool pool := &WorkerPool[T, R]{ tasks: make(chan T, tasks), From e270da3d77b03099f65355d3eb2f91e78a404638 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?P=C3=A9ter=20Szil=C3=A1gyi?= Date: Tue, 30 Apr 2024 09:06:24 +0300 Subject: [PATCH 3/4] core/state: use errgroups, commit accounts concurrently --- core/state/statedb.go | 103 ++++++++++++++++++----------------- internal/syncx/workerpool.go | 94 -------------------------------- 2 files changed, 53 insertions(+), 144 deletions(-) delete mode 100644 internal/syncx/workerpool.go diff --git a/core/state/statedb.go b/core/state/statedb.go index f173da0b92db..957cb531ccbd 100644 --- a/core/state/statedb.go +++ b/core/state/statedb.go @@ -21,7 +21,6 @@ import ( "fmt" "maps" "math/big" - "runtime" "slices" "sort" "sync" @@ -33,13 +32,13 @@ import ( "github.com/ethereum/go-ethereum/core/tracing" "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/crypto" - "github.com/ethereum/go-ethereum/internal/syncx" "github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/params" "github.com/ethereum/go-ethereum/trie" "github.com/ethereum/go-ethereum/trie/trienode" "github.com/ethereum/go-ethereum/trie/triestate" "github.com/holiman/uint256" + "golang.org/x/sync/errgroup" ) type revision struct { @@ -1159,11 +1158,51 @@ func (s *StateDB) Commit(block uint64, deleteEmptyObjects bool) (common.Hash, er // writes to run in parallel with the computations. start := time.Now() var ( - code = s.db.DiskDB().NewBatch() - lock sync.Mutex + code = s.db.DiskDB().NewBatch() + lock sync.Mutex + root common.Hash + workers errgroup.Group ) - workers := syncx.NewWorkerPool[*stateObject, error](len(s.mutations), min(len(s.mutations), runtime.NumCPU()), - func(obj *stateObject) error { + // Schedule the account trie first since that will be the biggest, so give + // it the most time to crunch. + workers.Go(func() error { + // Write the account trie changes, measuring the amount of wasted time + start = time.Now() + + newroot, set, err := s.trie.Commit(true) + if err != nil { + return err + } + root = newroot + + // Merge the dirty nodes of account trie into global set + if set != nil { + lock.Lock() + defer lock.Unlock() + + if err = nodes.Merge(set); err != nil { + return err + } + accountTrieNodesUpdated, accountTrieNodesDeleted = set.Size() + } + // Report the commit metrics + s.AccountCommits += time.Since(start) + return nil + }) + // Schedule each of the storage tries that need to be updated, so they can + // run concurrently to one another. + for addr, op := range s.mutations { + if op.isDelete() { + continue + } + // Write any contract code associated with the state object + obj := s.stateObjects[addr] + if obj.code != nil && obj.dirtyCode { + rawdb.WriteCode(code, common.BytesToHash(obj.CodeHash()), obj.code) + obj.dirtyCode = false + } + // Run the storage updates concurrently to one another + workers.Go(func() error { // Write any storage changes in the state object to its storage trie set, err := obj.commit() if err != nil { @@ -1185,60 +1224,24 @@ func (s *StateDB) Commit(block uint64, deleteEmptyObjects bool) (common.Hash, er } return nil }) - - for addr, op := range s.mutations { - if op.isDelete() { - continue - } - // Write any contract code associated with the state object - obj := s.stateObjects[addr] - if obj.code != nil && obj.dirtyCode { - rawdb.WriteCode(code, common.BytesToHash(obj.CodeHash()), obj.code) - obj.dirtyCode = false - } - // Run the storage updates concurrently to one another - workers.Schedule(obj) } - workers.Close() - // Updates running concurrently, wait for them to complete; running the code // writes in the meantime. done := make(chan struct{}) go func() { - // This goroutine is only needed to accurately measure the storage commit - // and not have the concurrent code write dirty the stats. defer close(done) - workers.Wait() - s.StorageCommits += time.Since(start) - }() - if code.ValueSize() > 0 { - if err := code.Write(); err != nil { - log.Crit("Failed to commit dirty codes", "error", err) - } - } - <-done - for err := range workers.Results() { - if err != nil { - return common.Hash{}, err + if code.ValueSize() > 0 { + if err := code.Write(); err != nil { + log.Crit("Failed to commit dirty codes", "error", err) + } } - } - // Write the account trie changes, measuring the amount of wasted time - start = time.Now() - - root, set, err := s.trie.Commit(true) - if err != nil { + }() + if err := workers.Wait(); err != nil { return common.Hash{}, err } - // Merge the dirty nodes of account trie into global set - if set != nil { - if err := nodes.Merge(set); err != nil { - return common.Hash{}, err - } - accountTrieNodesUpdated, accountTrieNodesDeleted = set.Size() - } - // Report the commit metrics - s.AccountCommits += time.Since(start) + s.StorageCommits += time.Since(start) + <-done accountUpdatedMeter.Mark(int64(s.AccountUpdated)) storageUpdatedMeter.Mark(int64(s.StorageUpdated)) diff --git a/internal/syncx/workerpool.go b/internal/syncx/workerpool.go deleted file mode 100644 index f207ac669e6a..000000000000 --- a/internal/syncx/workerpool.go +++ /dev/null @@ -1,94 +0,0 @@ -// Copyright 2024 The go-ethereum Authors -// This file is part of the go-ethereum library. -// -// The go-ethereum library is free software: you can redistribute it and/or modify -// it under the terms of the GNU Lesser General Public License as published by -// the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. -// -// The go-ethereum library is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU Lesser General Public License for more details. -// -// You should have received a copy of the GNU Lesser General Public License -// along with the go-ethereum library. If not, see . - -package syncx - -import ( - "runtime" - "sync" -) - -// WorkerPool is a concurrent task processor, scheduling and running tasks from -// a source channel, feeding any errors into a sink. -type WorkerPool[T any, R any] struct { - tasks chan T // Input channel waiting to consume tasks - results chan R // Result channel for consuers to wait on - working sync.WaitGroup // Waitgroup blocking on worker liveness -} - -// NewWorkerPool creates a worker pool with the given number of max task capacity -// and an optional goroutine count to execute on. If 0 threads are requested, the -// pool will default to the number of (logical) CPUs. -func NewWorkerPool[T any, R any](tasks int, threads int, f func(T) R) *WorkerPool[T, R] { - // Create the worker pool - pool := &WorkerPool[T, R]{ - tasks: make(chan T, tasks), - results: make(chan R, tasks), - } - // Start all the data processor routines - if threads == 0 { - threads = runtime.NumCPU() - } - pool.working.Add(threads) - for i := 0; i < threads; i++ { - go pool.work(f) - } - return pool -} - -// Close signals the end of the task stream. It does not block execution, rather -// returns immediately and users have to explicitly call Wait to block until the -// pool actually spins down. Alternatively, consumers can read the results chan, -// which will be closed after the last result is delivered. -// -// Calling Close multiple times will panic. Not particularly hard to avoid, but -// it's really a programming error. -func (pool *WorkerPool[T, R]) Close() { - close(pool.tasks) - go func() { - pool.working.Wait() - close(pool.results) - }() -} - -// Wait blocks until all the scheduled tasks have been processed. -func (pool *WorkerPool[T, R]) Wait() { - pool.working.Wait() -} - -// Schedule adds a task to the work queue. -func (pool *WorkerPool[T, R]) Schedule(task T) { - pool.tasks <- task -} - -// Results retrieves the result channel to consume the output of the individual -// work tasks. The channel will be closed after all tasks are done. -// -// Note, as long as the number of actually scheduled tasks are smaller or equal -// to the requested number form the constructor, it's fine to not consume this -// channel. -func (pool *WorkerPool[T, R]) Results() chan R { - return pool.results -} - -// work is the (one of many) goroutine consuming input tasks and executing them -// to compute the results. -func (pool *WorkerPool[T, R]) work(f func(T) R) { - defer pool.working.Done() - for task := range pool.tasks { - pool.results <- f(task) - } -} From c1f3b5f466f2585b4738bf1a273e75982a655967 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?P=C3=A9ter=20Szil=C3=A1gyi?= Date: Tue, 30 Apr 2024 09:29:54 +0300 Subject: [PATCH 4/4] core: resurrect detailed commit timers to almost-accuracy --- core/blockchain.go | 2 +- core/state/statedb.go | 47 +++++++++++++++++++++++++------------------ 2 files changed, 28 insertions(+), 21 deletions(-) diff --git a/core/blockchain.go b/core/blockchain.go index e4c89668245f..3e1f8942108a 100644 --- a/core/blockchain.go +++ b/core/blockchain.go @@ -1963,7 +1963,7 @@ func (bc *BlockChain) processBlock(block *types.Block, statedb *state.StateDB, s snapshotCommitTimer.Update(statedb.SnapshotCommits) // Snapshot commits are complete, we can mark them triedbCommitTimer.Update(statedb.TrieDBCommits) // Trie database commits are complete, we can mark them - blockWriteTimer.Update(time.Since(wstart) - statedb.AccountCommits - statedb.StorageCommits - statedb.SnapshotCommits - statedb.TrieDBCommits) + blockWriteTimer.Update(time.Since(wstart) - max(statedb.AccountCommits, statedb.StorageCommits) /* concurrent */ - statedb.SnapshotCommits - statedb.TrieDBCommits) blockInsertTimer.UpdateSince(start) return &blockProcessingResult{usedGas: usedGas, procTime: proctime, status: status}, nil diff --git a/core/state/statedb.go b/core/state/statedb.go index 957cb531ccbd..66cfc8f05a32 100644 --- a/core/state/statedb.go +++ b/core/state/statedb.go @@ -1165,10 +1165,15 @@ func (s *StateDB) Commit(block uint64, deleteEmptyObjects bool) (common.Hash, er ) // Schedule the account trie first since that will be the biggest, so give // it the most time to crunch. + // + // TODO(karalabe): This account trie commit is *very* heavy. 5-6ms at chain + // heads, which seems excessive given that it doesn't do hashing, it just + // shuffles some data. For comparison, the *hashing* at chain head is 2-3ms. + // We need to investigate what's happening as it seems something's wonky. + // Obviously it's not an end of the world issue, just something the original + // code didn't anticipate for. workers.Go(func() error { // Write the account trie changes, measuring the amount of wasted time - start = time.Now() - newroot, set, err := s.trie.Commit(true) if err != nil { return err @@ -1176,21 +1181,25 @@ func (s *StateDB) Commit(block uint64, deleteEmptyObjects bool) (common.Hash, er root = newroot // Merge the dirty nodes of account trie into global set - if set != nil { - lock.Lock() - defer lock.Unlock() + lock.Lock() + defer lock.Unlock() + if set != nil { if err = nodes.Merge(set); err != nil { return err } accountTrieNodesUpdated, accountTrieNodesDeleted = set.Size() } - // Report the commit metrics - s.AccountCommits += time.Since(start) + s.AccountCommits = time.Since(start) return nil }) // Schedule each of the storage tries that need to be updated, so they can // run concurrently to one another. + // + // TODO(karalabe): Experimentally, the account commit takes approximately the + // same time as all the storage commits combined, so we could maybe only have + // 2 threads in total. But that kind of depends on the account commit being + // more expensive than it should be, so let's fix that and revisit this todo. for addr, op := range s.mutations { if op.isDelete() { continue @@ -1211,10 +1220,10 @@ func (s *StateDB) Commit(block uint64, deleteEmptyObjects bool) (common.Hash, er // Merge the dirty nodes of storage trie into global set. It is possible // that the account was destructed and then resurrected in the same block. // In this case, the node set is shared by both accounts. - if set != nil { - lock.Lock() - defer lock.Unlock() + lock.Lock() + defer lock.Unlock() + if set != nil { if err = nodes.Merge(set); err != nil { return err } @@ -1222,27 +1231,25 @@ func (s *StateDB) Commit(block uint64, deleteEmptyObjects bool) (common.Hash, er storageTrieNodesUpdated += updates storageTrieNodesDeleted += deleted } + s.StorageCommits = time.Since(start) // overwrite with the longest storage commit runtime return nil }) } - // Updates running concurrently, wait for them to complete; running the code - // writes in the meantime. - done := make(chan struct{}) - go func() { - defer close(done) - + // Schedule the code commits to run concurrently too. This shouldn't really + // take much since we don't often commit code, but since it's disk access, + // it's always yolo. + workers.Go(func() error { if code.ValueSize() > 0 { if err := code.Write(); err != nil { log.Crit("Failed to commit dirty codes", "error", err) } } - }() + return nil + }) + // Wait for everything to finish and update the metrics if err := workers.Wait(); err != nil { return common.Hash{}, err } - s.StorageCommits += time.Since(start) - <-done - accountUpdatedMeter.Mark(int64(s.AccountUpdated)) storageUpdatedMeter.Mark(int64(s.StorageUpdated)) accountDeletedMeter.Mark(int64(s.AccountDeleted))