Skip to content

Commit

Permalink
Merge pull request #20 from dipdup-io/pyroscope
Browse files Browse the repository at this point in the history
Feature: add pyroscope
  • Loading branch information
aopoltorzhicky authored Oct 13, 2023
2 parents 6df6292 + f1522ef commit 986997d
Show file tree
Hide file tree
Showing 12 changed files with 160 additions and 75 deletions.
2 changes: 1 addition & 1 deletion build/Dockerfile
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
# ---------------------------------------------------------------------
# The first stage container, for building the application
# ---------------------------------------------------------------------
FROM golang:1.19-alpine as builder
FROM golang:1.21-alpine as builder

ENV CGO_ENABLED=1
ENV GO111MODULE=on
Expand Down
4 changes: 4 additions & 0 deletions build/dipdup.ghostnet.yml
Original file line number Diff line number Diff line change
Expand Up @@ -81,3 +81,7 @@ datasources:
ghostnet_rpc:
kind: tezos-node
url: https://rpc.tzkt.io/ghostnet

profiler:
server: ${PROFILER_SERVER}
project: tezos-ghostnet-mempool
6 changes: 5 additions & 1 deletion build/dipdup.mainnet.yml
Original file line number Diff line number Diff line change
Expand Up @@ -78,4 +78,8 @@ datasources:
url: https://api.tzkt.io
mainnet_rpc:
kind: tezos-node
url: https://rpc.tzkt.io/mainnet
url: https://rpc.tzkt.io/mainnet

profiler:
server: ${PROFILER_SERVER}
project: tezos-mainnet-mempool
4 changes: 4 additions & 0 deletions build/dipdup.testnet.yml
Original file line number Diff line number Diff line change
Expand Up @@ -183,3 +183,7 @@ datasources:
oxfordnet_rpc:
kind: tezos-node
url: https://rpc.tzkt.io/oxfordnet

profiler:
server: ${PROFILER_SERVER}
project: tezos-testnet-mempool
4 changes: 3 additions & 1 deletion cmd/mempool/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,14 @@ package config

import (
"github.com/dipdup-net/go-lib/config"
"github.com/dipdup-net/mempool/cmd/mempool/profiler"
)

// Config
type Config struct {
config.Config `yaml:",inline"`
Mempool Mempool `yaml:"mempool" validate:"required"`
Mempool Mempool `yaml:"mempool" validate:"required"`
Profiler *profiler.Config `yaml:"profiler,omitempty"`
}

// Mempool -
Expand Down
4 changes: 2 additions & 2 deletions cmd/mempool/handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ func (indexer *Indexer) inChainOperationProcess(tx pg.DBI, operations tzkt.Opera
return false
}
if err := models.SetInChain(tx, indexer.network, apiOperation.Hash, apiOperation.Type, operations.Level); err != nil {
indexer.error().Err(err).Msg("models.SetInChain")
indexer.error(err).Msg("models.SetInChain")
return false
}

Expand All @@ -113,7 +113,7 @@ func (indexer *Indexer) inChainOperationProcess(tx pg.DBI, operations tzkt.Opera
gasStats.TotalFee = *apiOperation.BakerFee
}
if err := gasStats.Save(tx); err != nil {
indexer.error().Err(err).Msg("gasStats.Save")
indexer.error(err).Msg("gasStats.Save")
return false
}
}
Expand Down
32 changes: 17 additions & 15 deletions cmd/mempool/indexer.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ type Indexer struct {
rights *ccache.Cache
delegates *CachedDelegates
state *database.State
logger zerolog.Logger
filters config.Filters
endorsements chan *models.Endorsement
network string
Expand Down Expand Up @@ -103,6 +104,7 @@ func NewIndexer(ctx context.Context, network string, indexerCfg config.Indexer,
gasStatsLifetime: gasStatsLifetime,
endorsements: make(chan *models.Endorsement, 1024*32),
rights: ccache.New(ccache.Configure().MaxSize(60)),
logger: log.Logger.With().Str("network", network).Logger(),
}
indexer.cache.Start(ctx)

Expand Down Expand Up @@ -134,7 +136,7 @@ func NewIndexer(ctx context.Context, network string, indexerCfg config.Indexer,
func (indexer *Indexer) Start(ctx context.Context) error {
indexer.info().Strs("kinds", indexer.filters.Kinds).Msg("starting...")

if err := indexer.initState(); err != nil {
if err := indexer.initState(ctx); err != nil {
return err
}

Expand All @@ -152,7 +154,7 @@ func (indexer *Indexer) Start(ctx context.Context) error {
for {
endorsements, err := models.EndorsementsWithoutBaker(indexer.db.DB(), indexer.network, 100, offset)
if err != nil {
log.Err(err).Msg("")
indexer.error(err).Msg("get endorsements without baker")
break
}
for i := range endorsements {
Expand Down Expand Up @@ -189,7 +191,7 @@ func (indexer *Indexer) sync(ctx context.Context) {

}

func (indexer *Indexer) initState() error {
func (indexer *Indexer) initState(ctx context.Context) error {
current, err := indexer.db.State(indexer.indexName)
switch {
case err == nil:
Expand Down Expand Up @@ -242,20 +244,20 @@ func (indexer *Indexer) listen(ctx context.Context) {
return
case operations := <-indexer.tzkt.Operations():
if err := indexer.handleInChain(ctx, operations); err != nil {
indexer.error().Err(err).Msg("handleInChain")
indexer.error(err).Msg("handleInChain")
continue
}
case block := <-indexer.tzkt.Blocks():
if err := indexer.handleBlock(ctx, block); err != nil {
indexer.error().Err(err).Msg("handleBlock")
indexer.error(err).Msg("handleBlock")
continue
}
case msg := <-indexer.mempool.Operations():
switch msg.Status {
case receiver.StatusApplied:
applied, ok := msg.Body.(node.Applied)
if !ok {
indexer.error().Msgf("invalid applied operation %v", applied)
indexer.error(nil).Msgf("invalid applied operation %v", applied)
continue
}
if !indexer.branches.Contains(applied.Branch) {
Expand All @@ -265,13 +267,13 @@ func (indexer *Indexer) listen(ctx context.Context) {
continue
}
if err := indexer.handleAppliedOperation(ctx, applied, msg.Protocol); err != nil {
log.Err(err).Msg("handleAppliedOperation")
indexer.error(err).Msg("handleAppliedOperation")
continue
}
case receiver.StatusBranchDelayed, receiver.StatusBranchRefused, receiver.StatusRefused, receiver.StatusUnprocessed, receiver.StatusOutdated:
failed, ok := msg.Body.(node.FailedMonitor)
if !ok {
indexer.error().Msgf("invalid %s operation %v", msg.Status, failed)
indexer.error(nil).Msgf("invalid %s operation %v", msg.Status, failed)
continue
}

Expand All @@ -282,11 +284,11 @@ func (indexer *Indexer) listen(ctx context.Context) {
continue
}
if err := indexer.handleFailedOperation(ctx, failed, string(msg.Status), msg.Protocol); err != nil {
indexer.error().Err(err).Msg("handleFailedOperation")
indexer.error(err).Msg("handleFailedOperation")
continue
}
default:
indexer.error().Msgf("invalid mempool operation status %s", msg.Status)
indexer.error(nil).Msgf("invalid mempool operation status %s", msg.Status)
}
}
}
Expand All @@ -307,7 +309,7 @@ func (indexer *Indexer) onPopBlockQueue(block Block) error {
}

func (indexer *Indexer) onRollbackBlockQueue(ctx context.Context, block Block) error {
log.Warn().Msgf("Rollback to %d level", block.Level)
indexer.warn().Msgf("Rollback to %d level", block.Level)
indexer.state.Level = block.Level
indexer.state.Timestamp = block.Timestamp

Expand All @@ -320,14 +322,14 @@ func (indexer *Indexer) onRollbackBlockQueue(ctx context.Context, block Block) e

}

func (indexer *Indexer) error() *zerolog.Event {
return log.Error().Uint64("state", indexer.state.Level).Str("name", indexer.indexName)
func (indexer *Indexer) error(err error) *zerolog.Event {
return indexer.logger.Err(err).Uint64("state", indexer.state.Level)
}

func (indexer *Indexer) info() *zerolog.Event {
return log.Info().Uint64("state", indexer.state.Level).Str("name", indexer.indexName)
return indexer.logger.Info().Uint64("state", indexer.state.Level)
}

func (indexer *Indexer) warn() *zerolog.Event {
return log.Warn().Uint64("state", indexer.state.Level).Str("name", indexer.indexName)
return indexer.logger.Warn().Uint64("state", indexer.state.Level)
}
28 changes: 23 additions & 5 deletions cmd/mempool/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"syscall"
"time"

"github.com/grafana/pyroscope-go"
"github.com/rs/zerolog"
"github.com/rs/zerolog/log"
"github.com/spf13/cobra"
Expand All @@ -20,6 +21,7 @@ import (
"github.com/dipdup-net/go-lib/prometheus"
"github.com/dipdup-net/mempool/cmd/mempool/config"
"github.com/dipdup-net/mempool/cmd/mempool/models"
"github.com/dipdup-net/mempool/cmd/mempool/profiler"
)

type startResult struct {
Expand Down Expand Up @@ -52,7 +54,7 @@ func main() {

var cfg config.Config
if err := libCfg.Parse(*configPath, &cfg); err != nil {
log.Err(err).Msg("")
log.Err(err).Msg("parse config")
return
}

Expand All @@ -61,6 +63,16 @@ func main() {

ctx, cancel := context.WithCancel(context.Background())

var prscp *pyroscope.Profiler
if cfg.Profiler != nil && cfg.Profiler.Server != "" {
p, err := profiler.New(cfg.Profiler, "indexer")
if err != nil {
log.Err(err).Msg("create profiler")
return
}
prscp = p
}

var prometheusService *prometheus.Service
if cfg.Prometheus != nil {
prometheusService = prometheus.NewService(cfg.Prometheus)
Expand Down Expand Up @@ -110,7 +122,7 @@ func main() {
if err == nil {
return
}
log.Err(err).Msg("")
log.Err(err).Msg("start indexer")

ticker := time.NewTicker(time.Minute)
defer ticker.Stop()
Expand All @@ -124,7 +136,7 @@ func main() {
if err == nil {
return
}
log.Err(err).Msg("")
log.Err(err).Msg("start indexer")
}
}
}(network, mempool)
Expand All @@ -134,7 +146,7 @@ func main() {

views, err := createViews(ctx, cfg.Database)
if err != nil {
log.Err(err).Msg("")
log.Err(err).Msg("creating views")
cancel()
return
}
Expand Down Expand Up @@ -171,7 +183,13 @@ func main() {

if prometheusService != nil {
if err := prometheusService.Close(); err != nil {
log.Err(err).Msg("")
log.Err(err).Msg("stopping prometheus")
}
}

if prscp != nil {
if err := prscp.Stop(); err != nil {
log.Panic().Err(err).Msg("stopping pyroscope")
}
}

Expand Down
44 changes: 44 additions & 0 deletions cmd/mempool/profiler/profiler.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
package profiler

import (
"fmt"
"runtime"

"github.com/grafana/pyroscope-go"
)

type Config struct {
Server string `validate:"omitempty,http_url" yaml:"server"`
Project string `validate:"omitempty" yaml:"project"`
}

func New(cfg *Config, service string) (*pyroscope.Profiler, error) {
if cfg == nil || cfg.Server == "" {
return nil, nil
}

runtime.SetMutexProfileFraction(5)
runtime.SetBlockProfileRate(5)

return pyroscope.Start(pyroscope.Config{
ApplicationName: fmt.Sprintf("%s-%s", cfg.Project, service),
ServerAddress: cfg.Server,
Tags: map[string]string{
"project": cfg.Project,
"service": service,
},

ProfileTypes: []pyroscope.ProfileType{
pyroscope.ProfileCPU,
pyroscope.ProfileAllocObjects,
pyroscope.ProfileAllocSpace,
pyroscope.ProfileInuseObjects,
pyroscope.ProfileInuseSpace,
pyroscope.ProfileGoroutines,
pyroscope.ProfileMutexCount,
pyroscope.ProfileMutexDuration,
pyroscope.ProfileBlockCount,
pyroscope.ProfileBlockDuration,
},
})
}
16 changes: 8 additions & 8 deletions cmd/mempool/receiver/receiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,13 +177,13 @@ func (indexer *Receiver) updateState(ctx context.Context, url string) {
defer ticker.Stop()

// init
if err := indexer.setState(); err != nil {
log.Err(err).Msg("")
if err := indexer.setState(ctx); err != nil {
log.Err(err).Msg("set state")
}

rpc := node.NewMainRPC(url)
if err := indexer.checkHead(ctx, rpc); err != nil {
log.Err(err).Msg("")
log.Err(err).Msg("check head")
}

for {
Expand All @@ -192,22 +192,22 @@ func (indexer *Receiver) updateState(ctx context.Context, url string) {
return
case <-ticker.C:
if err := indexer.checkHead(ctx, rpc); err != nil {
log.Err(err).Msg("")
log.Err(err).Msg("check head")
continue
}
if err := indexer.setState(); err != nil {
log.Err(err).Msg("")
if err := indexer.setState(ctx); err != nil {
log.Err(err).Msg("set state")
continue
}
}
}
}

func (indexer *Receiver) setState() error {
func (indexer *Receiver) setState(ctx context.Context) error {
state, err := indexer.db.State(indexer.indexName)
if err != nil {
if errors.Is(err, pg.ErrNoRows) {
indexer.state = &database.State{}
indexer.state = new(database.State)
return nil
}
return err
Expand Down
Loading

0 comments on commit 986997d

Please sign in to comment.