Skip to content

Commit

Permalink
Merge pull request #966 from iotaledger/fix/prometheus-init-data-race
Browse files Browse the repository at this point in the history
Fix prometheus init data race
  • Loading branch information
muXxer authored May 8, 2024
2 parents 521e90a + 0fd1f01 commit b390435
Show file tree
Hide file tree
Showing 6 changed files with 77 additions and 60 deletions.
26 changes: 15 additions & 11 deletions components/prometheus/collector/collection.go
Original file line number Diff line number Diff line change
@@ -1,37 +1,41 @@
package collector

import (
"github.com/iotaledger/hive.go/ds/shrinkingmap"
"github.com/iotaledger/hive.go/runtime/options"
)

type Collection struct {
CollectionName string
metrics map[string]*Metric
metrics *shrinkingmap.ShrinkingMap[string, *Metric]
}

func NewCollection(name string, opts ...options.Option[Collection]) *Collection {
return options.Apply(&Collection{
CollectionName: name,
metrics: make(map[string]*Metric),
}, opts, func(c *Collection) {
for _, m := range c.metrics {
m.Namespace = c.CollectionName
m.initPromMetric()
}
metrics: shrinkingmap.New[string, *Metric](),
}, opts, func(collection *Collection) {
collection.metrics.ForEach(func(_ string, metric *Metric) bool {
metric.Namespace = collection.CollectionName
metric.initPromMetric()

return true
})
})
}

func (c *Collection) GetMetric(metricName string) *Metric {
if metric, exists := c.metrics[metricName]; exists {
return metric
metric, exists := c.metrics.Get(metricName)
if !exists {
return nil
}

return nil
return metric
}

func (c *Collection) addMetric(metric *Metric) {
if metric != nil {
c.metrics[metric.Name] = metric
c.metrics.Set(metric.Name, metric)
}
}

Expand Down
57 changes: 34 additions & 23 deletions components/prometheus/collector/collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,43 +2,50 @@ package collector

import (
"github.com/prometheus/client_golang/prometheus"

"github.com/iotaledger/hive.go/ds/shrinkingmap"
)

// Collector is responsible for creation and collection of metrics for the prometheus.
type Collector struct {
Registry *prometheus.Registry
collections map[string]*Collection
collections *shrinkingmap.ShrinkingMap[string, *Collection]
}

// New creates an instance of Manager and creates a new prometheus registry for the protocol metrics collection.
func New() *Collector {
return &Collector{
Registry: prometheus.NewRegistry(),
collections: make(map[string]*Collection),
collections: shrinkingmap.New[string, *Collection](),
}
}

func (c *Collector) RegisterCollection(coll *Collection) {
c.collections[coll.CollectionName] = coll
for _, m := range coll.metrics {
c.Registry.MustRegister(m.promMetric)
if m.initValueFunc != nil {
metricValue, labelValues := m.initValueFunc()
m.update(metricValue, labelValues...)
func (c *Collector) RegisterCollection(collection *Collection) {
c.collections.Set(collection.CollectionName, collection)
collection.metrics.ForEach(func(_ string, metric *Metric) bool {
c.Registry.MustRegister(metric.promMetric)
if metric.initValueFunc != nil {
metricValue, labelValues := metric.initValueFunc()
metric.update(metricValue, labelValues...)
}
if m.initFunc != nil {
m.initFunc()
if metric.initFunc != nil {
metric.initFunc()
}
}

return true
})
}

// Collect collects all metrics from the registered collections.
func (c *Collector) Collect() {
for _, collection := range c.collections {
for _, metric := range collection.metrics {
c.collections.ForEach(func(_ string, collection *Collection) bool {
collection.metrics.ForEach(func(_ string, metric *Metric) bool {
metric.collect()
}
}
return true
})

return true
})
}

// Update updates the value of the existing metric defined by the subsystem and metricName.
Expand Down Expand Up @@ -78,11 +85,14 @@ func (c *Collector) ResetMetric(namespace string, metricName string) {
}

func (c *Collector) Shutdown() {
for _, collection := range c.collections {
for _, metric := range collection.metrics {
c.collections.ForEach(func(_ string, collection *Collection) bool {
collection.metrics.ForEach(func(_ string, metric *Metric) bool {
metric.shutdown()
}
}
return true
})

return true
})
}

func (c *Collector) getMetric(subsystem string, metricName string) *Metric {
Expand All @@ -95,9 +105,10 @@ func (c *Collector) getMetric(subsystem string, metricName string) *Metric {
}

func (c *Collector) getCollection(subsystem string) *Collection {
if collection, exists := c.collections[subsystem]; exists {
return collection
collection, exists := c.collections.Get(subsystem)
if !exists {
return nil
}

return nil
return collection
}
47 changes: 24 additions & 23 deletions components/prometheus/component.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,12 @@ import (

func init() {
Component = &app.Component{
Name: "Prometheus",
DepsFunc: func(cDeps dependencies) { deps = cDeps },
Params: params,
Provide: provide,
Run: run,
Name: "Prometheus",
DepsFunc: func(cDeps dependencies) { deps = cDeps },
Params: params,
Provide: provide,
Configure: configure,
Run: run,
IsEnabled: func(_ *dig.Container) bool {
return ParamsMetrics.Enabled
},
Expand All @@ -56,17 +57,32 @@ type dependencies struct {
Collector *collector.Collector
}

func run() error {
Component.LogInfo("Starting Prometheus exporter ...")
func provide(c *dig.Container) error {
return c.Provide(collector.New)
}

func configure() error {
if ParamsMetrics.GoMetrics {
deps.Collector.Registry.MustRegister(collectors.NewGoCollector())
}
if ParamsMetrics.ProcessMetrics {
deps.Collector.Registry.MustRegister(collectors.NewProcessCollector(collectors.ProcessCollectorOpts{}))
}

registerMetrics()
deps.Collector.RegisterCollection(TangleMetrics)
deps.Collector.RegisterCollection(ConflictMetrics)
deps.Collector.RegisterCollection(InfoMetrics)
deps.Collector.RegisterCollection(DBMetrics)
deps.Collector.RegisterCollection(CommitmentsMetrics)
deps.Collector.RegisterCollection(SlotMetrics)
deps.Collector.RegisterCollection(AccountMetrics)
deps.Collector.RegisterCollection(SchedulerMetrics)

return nil
}

func run() error {
Component.LogInfo("Starting Prometheus exporter ...")

return Component.Daemon().BackgroundWorker("Prometheus exporter", func(ctx context.Context) {
Component.LogInfo("Starting Prometheus exporter ... done")
Expand Down Expand Up @@ -118,18 +134,3 @@ func run() error {
Component.LogInfo("Stopping Prometheus exporter ... done")
}, daemon.PriorityMetrics)
}

func provide(c *dig.Container) error {
return c.Provide(collector.New)
}

func registerMetrics() {
deps.Collector.RegisterCollection(TangleMetrics)
deps.Collector.RegisterCollection(ConflictMetrics)
deps.Collector.RegisterCollection(InfoMetrics)
deps.Collector.RegisterCollection(DBMetrics)
deps.Collector.RegisterCollection(CommitmentsMetrics)
deps.Collector.RegisterCollection(SlotMetrics)
deps.Collector.RegisterCollection(AccountMetrics)
deps.Collector.RegisterCollection(SchedulerMetrics)
}
4 changes: 2 additions & 2 deletions pkg/toolset/benchmark.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ func benchmarkIO(args []string) error {
ts := time.Now()

lastStatusTime := time.Now()
for i := 0; i < objectCnt; i++ {
for i := range objectCnt {
// one read operation and one write operation per cycle
batchWriter.Enqueue(newBenchmarkObject(store, writeDoneWaitGroup, iotago_tpkg.RandBytes(32), iotago_tpkg.RandBytes(size)))

Expand Down Expand Up @@ -154,7 +154,7 @@ func benchmarkCPU(args []string) error {
}
}()

for i := 0; i < numWorkers; i++ {
for range numWorkers {
wg.Add(1)
go func() {
defer wg.Done()
Expand Down
2 changes: 1 addition & 1 deletion pkg/toolset/pwd_hash.go
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
//nolint:err113
package toolset

import (
Expand Down Expand Up @@ -128,7 +129,6 @@ func hashPasswordAndSalt(args []string) error {
}

if *outputJSONFlag {

result := struct {
Password string `json:"passwordHash"`
Salt string `json:"passwordSalt"`
Expand Down
1 change: 1 addition & 0 deletions pkg/toolset/toolset.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ const (
)

const (
//nolint:gosec // there is no hardcoded password
passwordEnvKey = "IOTA_CORE_TOOL_PASSWORD"

// printStatusInterval is the interval for printing status messages.
Expand Down

0 comments on commit b390435

Please sign in to comment.