diff --git a/nodebuilder/settings.go b/nodebuilder/settings.go index 9a9f7fcf47..c30996d24e 100644 --- a/nodebuilder/settings.go +++ b/nodebuilder/settings.go @@ -90,6 +90,7 @@ func WithMetrics(metricOpts []otlpmetrichttp.Option, nodeType node.Type) fx.Opti case node.Full: opts = fx.Options( baseComponents, + fx.Invoke(share.WithStoreMetrics), fx.Invoke(share.WithShrexServerMetrics), samplingMetrics, ) @@ -101,6 +102,7 @@ func WithMetrics(metricOpts []otlpmetrichttp.Option, nodeType node.Type) fx.Opti case node.Bridge: opts = fx.Options( baseComponents, + fx.Invoke(share.WithStoreMetrics), fx.Invoke(share.WithShrexServerMetrics), ) default: diff --git a/nodebuilder/share/opts.go b/nodebuilder/share/opts.go index 20ba0ce58c..e236847f41 100644 --- a/nodebuilder/share/opts.go +++ b/nodebuilder/share/opts.go @@ -1,6 +1,7 @@ package share import ( + "github.com/celestiaorg/celestia-node/share/eds" "github.com/celestiaorg/celestia-node/share/getters" disc "github.com/celestiaorg/celestia-node/share/p2p/discovery" "github.com/celestiaorg/celestia-node/share/p2p/peers" @@ -41,3 +42,7 @@ func WithShrexServerMetrics(edsServer *shrexeds.Server, ndServer *shrexnd.Server func WithShrexGetterMetrics(sg *getters.ShrexGetter) error { return sg.WithMetrics() } + +func WithStoreMetrics(s *eds.Store) error { + return s.WithMetrics() +} diff --git a/share/eds/metrics.go b/share/eds/metrics.go new file mode 100644 index 0000000000..82adb246f1 --- /dev/null +++ b/share/eds/metrics.go @@ -0,0 +1,267 @@ +package eds + +import ( + "context" + "time" + + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/metric" +) + +const ( + failedKey = "failed" + sizeKey = "eds_size" + cachedKey = "cached" + + putResultKey = "result" + putOK putResult = "ok" + putExists putResult = "exists" + putFailed putResult = "failed" + + opNameKey = "op" + longOpResultKey = "result" + longOpUnresolved longOpResult = "unresolved" + longOpOK longOpResult = "ok" + longOpFailed longOpResult = "failed" +) + +var ( + meter = otel.Meter("eds_store") +) + +type putResult string + +type longOpResult string + +type metrics struct { + putTime metric.Float64Histogram + getCARTime metric.Float64Histogram + getCARBlockstoreTime metric.Float64Histogram + getDAHTime metric.Float64Histogram + removeTime metric.Float64Histogram + getTime metric.Float64Histogram + hasTime metric.Float64Histogram + listTime metric.Float64Histogram + getAccessorTime metric.Float64Histogram + + longOpTime metric.Float64Histogram + gcTime metric.Float64Histogram +} + +func (s *Store) WithMetrics() error { + putTime, err := meter.Float64Histogram("eds_store_put_time_histogram", + metric.WithDescription("eds store put time histogram(s)")) + if err != nil { + return err + } + + getCARTime, err := meter.Float64Histogram("eds_store_get_car_time_histogram", + metric.WithDescription("eds store get car time histogram(s)")) + if err != nil { + return err + } + + getCARBlockstoreTime, err := meter.Float64Histogram("eds_store_get_car_blockstore_time_histogram", + metric.WithDescription("eds store get car blockstore time histogram(s)")) + if err != nil { + return err + } + + getDAHTime, err := meter.Float64Histogram("eds_store_get_dah_time_histogram", + metric.WithDescription("eds store get dah time histogram(s)")) + if err != nil { + return err + } + + removeTime, err := meter.Float64Histogram("eds_store_remove_time_histogram", + metric.WithDescription("eds store remove time histogram(s)")) + if err != nil { + return err + } + + getTime, err := meter.Float64Histogram("eds_store_get_time_histogram", + metric.WithDescription("eds store get time histogram(s)")) + if err != nil { + return err + } + + hasTime, err := meter.Float64Histogram("eds_store_has_time_histogram", + metric.WithDescription("eds store has time histogram(s)")) + if err != nil { + return err + } + + listTime, err := meter.Float64Histogram("eds_store_list_time_histogram", + metric.WithDescription("eds store list time histogram(s)")) + if err != nil { + return err + } + + getAccessorTime, err := meter.Float64Histogram("eds_store_get_accessor_time_histogram", + metric.WithDescription("eds store get accessor time histogram(s)")) + if err != nil { + return err + } + + longOpTime, err := meter.Float64Histogram("eds_store_long_operation_time_histogram", + metric.WithDescription("eds store long operation time histogram(s)")) + if err != nil { + return err + } + + gcTime, err := meter.Float64Histogram("eds_store_gc_time", + metric.WithDescription("dagstore gc time histogram(s)")) + if err != nil { + return err + } + + s.metrics = &metrics{ + putTime: putTime, + getCARTime: getCARTime, + getCARBlockstoreTime: getCARBlockstoreTime, + getDAHTime: getDAHTime, + removeTime: removeTime, + getTime: getTime, + hasTime: hasTime, + listTime: listTime, + getAccessorTime: getAccessorTime, + longOpTime: longOpTime, + gcTime: gcTime, + } + return nil +} + +func (m *metrics) observeGCtime(ctx context.Context, dur time.Duration, failed bool) { + if m == nil { + return + } + if ctx.Err() != nil { + ctx = context.Background() + } + m.gcTime.Record(ctx, dur.Seconds(), metric.WithAttributes( + attribute.Bool(failedKey, failed))) +} + +func (m *metrics) observePut(ctx context.Context, dur time.Duration, result putResult, size uint) { + if m == nil { + return + } + if ctx.Err() != nil { + ctx = context.Background() + } + + m.putTime.Record(ctx, dur.Seconds(), metric.WithAttributes( + attribute.String(putResultKey, string(result)), + attribute.Int(sizeKey, int(size)))) +} + +func (m *metrics) observeLongOp(ctx context.Context, opName string, dur time.Duration, result longOpResult) { + if m == nil { + return + } + if ctx.Err() != nil { + ctx = context.Background() + } + + m.longOpTime.Record(ctx, dur.Seconds(), metric.WithAttributes( + attribute.String(opNameKey, opName), + attribute.String(longOpResultKey, string(result)))) +} + +func (m *metrics) observeGetCAR(ctx context.Context, dur time.Duration, failed bool) { + if m == nil { + return + } + if ctx.Err() != nil { + ctx = context.Background() + } + + m.getCARTime.Record(ctx, dur.Seconds(), metric.WithAttributes( + attribute.Bool(failedKey, failed))) +} + +func (m *metrics) observeCARBlockstore(ctx context.Context, dur time.Duration, failed bool) { + if m == nil { + return + } + if ctx.Err() != nil { + ctx = context.Background() + } + + m.getCARBlockstoreTime.Record(ctx, dur.Seconds(), metric.WithAttributes( + attribute.Bool(failedKey, failed))) +} + +func (m *metrics) observeGetDAH(ctx context.Context, dur time.Duration, failed bool) { + if m == nil { + return + } + if ctx.Err() != nil { + ctx = context.Background() + } + + m.getDAHTime.Record(ctx, dur.Seconds(), metric.WithAttributes( + attribute.Bool(failedKey, failed))) +} + +func (m *metrics) observeRemove(ctx context.Context, dur time.Duration, failed bool) { + if m == nil { + return + } + if ctx.Err() != nil { + ctx = context.Background() + } + + m.removeTime.Record(ctx, dur.Seconds(), metric.WithAttributes( + attribute.Bool(failedKey, failed))) +} + +func (m *metrics) observeGet(ctx context.Context, dur time.Duration, failed bool) { + if m == nil { + return + } + if ctx.Err() != nil { + ctx = context.Background() + } + + m.getTime.Record(ctx, dur.Seconds(), metric.WithAttributes( + attribute.Bool(failedKey, failed))) +} + +func (m *metrics) observeHas(ctx context.Context, dur time.Duration, failed bool) { + if m == nil { + return + } + if ctx.Err() != nil { + ctx = context.Background() + } + + m.hasTime.Record(ctx, dur.Seconds(), metric.WithAttributes( + attribute.Bool(failedKey, failed))) +} + +func (m *metrics) observeList(ctx context.Context, dur time.Duration, failed bool) { + if m == nil { + return + } + if ctx.Err() != nil { + ctx = context.Background() + } + + m.listTime.Record(ctx, dur.Seconds(), metric.WithAttributes( + attribute.Bool(failedKey, failed))) +} + +func (m *metrics) observeGetAccessor(ctx context.Context, dur time.Duration, cached, failed bool) { + if m == nil { + return + } + if ctx.Err() != nil { + ctx = context.Background() + } + + m.getAccessorTime.Record(ctx, dur.Seconds(), metric.WithAttributes( + attribute.Bool(cachedKey, cached), + attribute.Bool(failedKey, failed))) +} diff --git a/share/eds/store.go b/share/eds/store.go index cd85380ba1..2b2c37b26d 100644 --- a/share/eds/store.go +++ b/share/eds/store.go @@ -63,6 +63,8 @@ type Store struct { gcInterval time.Duration // lastGCResult is only stored on the store for testing purposes. lastGCResult atomic.Pointer[dagstore.GCResult] + + metrics *metrics } // NewStore creates a new EDS Store under the given basepath and datastore. @@ -148,7 +150,9 @@ func (s *Store) gc(ctx context.Context) { case <-ctx.Done(): return case <-ticker.C: + tnow := time.Now() res, err := s.dgstr.GC(ctx) + s.metrics.observeGCtime(ctx, time.Since(tnow), err != nil) if err != nil { log.Errorf("garbage collecting dagstore: %v", err) return @@ -164,22 +168,30 @@ func (s *Store) gc(ctx context.Context) { // The square is verified on the Exchange level, and Put only stores the square, trusting it. // The resulting file stores all the shares and NMT Merkle Proofs of the EDS. // Additionally, the file gets indexed s.t. store.Blockstore can access them. -func (s *Store) Put(ctx context.Context, root share.DataHash, square *rsmt2d.ExtendedDataSquare) (err error) { - // if root already exists, short-circuit - has, err := s.Has(ctx, root) - if err != nil { - return fmt.Errorf("failed to check if root already exists in index: %w", err) - } - if has { - return dagstore.ErrShardExists - } - +func (s *Store) Put(ctx context.Context, root share.DataHash, square *rsmt2d.ExtendedDataSquare) error { ctx, span := tracer.Start(ctx, "store/put", trace.WithAttributes( attribute.Int("width", int(square.Width())), )) - defer func() { - utils.SetStatusAndEnd(span, err) - }() + + tnow := time.Now() + err := s.put(ctx, root, square) + result := putOK + switch { + case errors.Is(err, dagstore.ErrShardExists): + result = putExists + case err != nil: + result = putFailed + } + utils.SetStatusAndEnd(span, err) + s.metrics.observePut(ctx, time.Since(tnow), result, square.Width()) + return err +} + +func (s *Store) put(ctx context.Context, root share.DataHash, square *rsmt2d.ExtendedDataSquare) (err error) { + // if root already exists, short-circuit + if has, _ := s.Has(ctx, root); has { + return dagstore.ErrShardExists + } key := root.String() f, err := os.OpenFile(s.basepath+blocksPath+key, os.O_CREATE|os.O_WRONLY, 0600) @@ -203,7 +215,8 @@ func (s *Store) Put(ctx context.Context, root share.DataHash, square *rsmt2d.Ext select { case <-ctx.Done(): - go logLateResult(ch, time.Minute*5) + // if context finished before result was received, track result in separate goroutine + go trackLateResult("put", ch, s.metrics, time.Minute*5) return ctx.Err() case result := <-ch: if result.Error != nil { @@ -217,21 +230,24 @@ func (s *Store) Put(ctx context.Context, root share.DataHash, square *rsmt2d.Ext // maxWait. If the result is not received within the specified duration, it logs an error // indicating that the parent context has expired and the shard registration is stuck. If a result // is received, it checks for any error and logs appropriate messages. -func logLateResult(res <-chan dagstore.ShardResult, maxWait time.Duration) { +func trackLateResult(opName string, res <-chan dagstore.ShardResult, metrics *metrics, maxWait time.Duration) { tnow := time.Now() select { case <-time.After(maxWait): + metrics.observeLongOp(context.Background(), opName, time.Since(tnow), longOpUnresolved) log.Errorf("parent context is expired, while register shard is stuck for more than %v sec", time.Since(tnow)) return case result := <-res: - // don't log if result was received right after launch of the func + // don't observe if result was received right after launch of the func if time.Since(tnow) < time.Second { return } if result.Error != nil { + metrics.observeLongOp(context.Background(), opName, time.Since(tnow), longOpFailed) log.Errorf("failed to register shard after context expired: %v ago, err: %w", time.Since(tnow), result.Error) return } + metrics.observeLongOp(context.Background(), opName, time.Since(tnow), longOpOK) log.Warnf("parent context expired, but register shard finished with no error,"+ " after context expired: %v ago", time.Since(tnow)) return @@ -247,8 +263,14 @@ func logLateResult(res <-chan dagstore.ShardResult, maxWait time.Duration) { // same reader. The cache is responsible for closing the underlying reader. func (s *Store) GetCAR(ctx context.Context, root share.DataHash) (io.Reader, error) { ctx, span := tracer.Start(ctx, "store/get-car") - defer span.End() + tnow := time.Now() + r, err := s.getCAR(ctx, root) + s.metrics.observeGetCAR(ctx, time.Since(tnow), err != nil) + utils.SetStatusAndEnd(span, err) + return r, err +} +func (s *Store) getCAR(ctx context.Context, root share.DataHash) (io.Reader, error) { key := root.String() accessor, err := s.getCachedAccessor(ctx, shard.KeyFromString(key)) if err != nil { @@ -272,6 +294,18 @@ func (s *Store) Blockstore() bstore.Blockstore { func (s *Store) CARBlockstore( ctx context.Context, root share.DataHash, +) (dagstore.ReadBlockstore, error) { + ctx, span := tracer.Start(ctx, "store/car-blockstore") + tnow := time.Now() + r, err := s.carBlockstore(ctx, root) + s.metrics.observeCARBlockstore(ctx, time.Since(tnow), err != nil) + utils.SetStatusAndEnd(span, err) + return r, err +} + +func (s *Store) carBlockstore( + ctx context.Context, + root share.DataHash, ) (dagstore.ReadBlockstore, error) { key := shard.KeyFromString(root.String()) accessor, err := s.getCachedAccessor(ctx, key) @@ -283,9 +317,15 @@ func (s *Store) CARBlockstore( // GetDAH returns the DataAvailabilityHeader for the EDS identified by DataHash. func (s *Store) GetDAH(ctx context.Context, root share.DataHash) (*share.Root, error) { - ctx, span := tracer.Start(ctx, "store/get-dah") - defer span.End() + ctx, span := tracer.Start(ctx, "store/car-dah") + tnow := time.Now() + r, err := s.getDAH(ctx, root) + s.metrics.observeGetDAH(ctx, time.Since(tnow), err != nil) + utils.SetStatusAndEnd(span, err) + return r, err +} +func (s *Store) getDAH(ctx context.Context, root share.DataHash) (*share.Root, error) { key := shard.KeyFromString(root.String()) accessor, err := s.getCachedAccessor(ctx, key) if err != nil { @@ -334,6 +374,7 @@ func (s *Store) getAccessor(ctx context.Context, key shard.Key) (*dagstore.Shard } return res.Accessor, nil case <-ctx.Done(): + go trackLateResult("get_shard", ch, s.metrics, time.Minute) return nil, ctx.Err() } } @@ -343,30 +384,40 @@ func (s *Store) getCachedAccessor(ctx context.Context, key shard.Key) (*accessor lk.Lock() defer lk.Unlock() + tnow := time.Now() accessor, err := s.cache.unsafeGet(key) if err != nil && err != errCacheMiss { log.Errorf("unexpected error while reading key from bs cache %s: %s", key, err) } if accessor != nil { + s.metrics.observeGetAccessor(ctx, time.Since(tnow), true, false) return accessor, nil } // wasn't found in cache, so acquire it and add to cache shardAccessor, err := s.getAccessor(ctx, key) if err != nil { + s.metrics.observeGetAccessor(ctx, time.Since(tnow), false, err != nil) return nil, err } - return s.cache.unsafeAdd(key, shardAccessor) + + a, err := s.cache.unsafeAdd(key, shardAccessor) + s.metrics.observeGetAccessor(ctx, time.Since(tnow), false, err != nil) + return a, err } // Remove removes EDS from Store by the given share.Root hash and cleans up all // the indexing. -func (s *Store) Remove(ctx context.Context, root share.DataHash) (err error) { +func (s *Store) Remove(ctx context.Context, root share.DataHash) error { ctx, span := tracer.Start(ctx, "store/remove") - defer func() { - utils.SetStatusAndEnd(span, err) - }() + tnow := time.Now() + err := s.remove(ctx, root) + s.metrics.observeRemove(ctx, time.Since(tnow), err != nil) + utils.SetStatusAndEnd(span, err) + return err +} +func (s *Store) remove(ctx context.Context, root share.DataHash) (err error) { key := root.String() ch := make(chan dagstore.ShardResult, 1) err = s.dgstr.DestroyShard(ctx, shard.KeyFromString(key), ch, dagstore.DestroyOpts{}) @@ -380,6 +431,7 @@ func (s *Store) Remove(ctx context.Context, root share.DataHash) (err error) { return fmt.Errorf("failed to destroy shard: %w", result.Error) } case <-ctx.Done(): + go trackLateResult("remove", ch, s.metrics, time.Minute) return ctx.Err() } @@ -402,7 +454,16 @@ func (s *Store) Remove(ctx context.Context, root share.DataHash) (err error) { // // It reads only one quadrant(1/4) of the EDS and verifies the integrity of the stored data by // recomputing it. -func (s *Store) Get(ctx context.Context, root share.DataHash) (eds *rsmt2d.ExtendedDataSquare, err error) { +func (s *Store) Get(ctx context.Context, root share.DataHash) (*rsmt2d.ExtendedDataSquare, error) { + ctx, span := tracer.Start(ctx, "store/get") + tnow := time.Now() + eds, err := s.get(ctx, root) + s.metrics.observeGet(ctx, time.Since(tnow), err != nil) + utils.SetStatusAndEnd(span, err) + return eds, err +} + +func (s *Store) get(ctx context.Context, root share.DataHash) (eds *rsmt2d.ExtendedDataSquare, err error) { ctx, span := tracer.Start(ctx, "store/get") defer func() { utils.SetStatusAndEnd(span, err) @@ -420,10 +481,16 @@ func (s *Store) Get(ctx context.Context, root share.DataHash) (eds *rsmt2d.Exten } // Has checks if EDS exists by the given share.Root hash. -func (s *Store) Has(ctx context.Context, root share.DataHash) (bool, error) { - _, span := tracer.Start(ctx, "store/has") - defer span.End() +func (s *Store) Has(ctx context.Context, root share.DataHash) (has bool, err error) { + ctx, span := tracer.Start(ctx, "store/has") + tnow := time.Now() + eds, err := s.has(ctx, root) + s.metrics.observeHas(ctx, time.Since(tnow), err != nil) + utils.SetStatusAndEnd(span, err) + return eds, err +} +func (s *Store) has(_ context.Context, root share.DataHash) (bool, error) { key := root.String() info, err := s.dgstr.GetShardInfo(shard.KeyFromString(key)) switch err { @@ -438,6 +505,15 @@ func (s *Store) Has(ctx context.Context, root share.DataHash) (bool, error) { // List lists all the registered EDSes. func (s *Store) List() ([]share.DataHash, error) { + ctx, span := tracer.Start(context.Background(), "store/list") + tnow := time.Now() + hashes, err := s.list() + s.metrics.observeList(ctx, time.Since(tnow), err != nil) + utils.SetStatusAndEnd(span, err) + return hashes, err +} + +func (s *Store) list() ([]share.DataHash, error) { shards := s.dgstr.AllShardsInfo() hashes := make([]share.DataHash, 0, len(shards)) for shrd := range shards {