Skip to content

Commit

Permalink
Feature: new metadata gauge (#16)
Browse files Browse the repository at this point in the history
  • Loading branch information
aopoltorzhicky authored Jan 14, 2022
1 parent 9de16a0 commit 4255707
Show file tree
Hide file tree
Showing 13 changed files with 126 additions and 8 deletions.
3 changes: 1 addition & 2 deletions cmd/metadata/contract.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ func (indexer *Indexer) processContractMetadata(update api.BigMapUpdate) (*model
return nil, err
}

indexer.incrementCounter("contract", models.StatusNew)
indexer.incrementNewMetadataGauge("contract")

return &models.ContractMetadata{
Network: indexer.network,
Expand Down Expand Up @@ -73,7 +73,6 @@ func (indexer *Indexer) resolveContractMetadata(ctx context.Context, cm *models.
}
}
cm.UpdateID = indexer.contractActionsCounter.Increment()
indexer.incrementCounter("contract", cm.Status)

if resolved.By == resolver.ResolverTypeIPFS && cm.Status == models.StatusApplied {
if resolved.ResponseTime > 0 {
Expand Down
42 changes: 42 additions & 0 deletions cmd/metadata/indexer.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,11 +87,13 @@ func NewIndexer(ctx context.Context, network string, indexerConfig *config.Index
db, indexer.resolveContractMetadata, network,
service.WithMaxRetryCountContract(settings.MaxRetryCountOnError),
service.WithWorkersCountContract(settings.ContractServiceWorkers),
service.WithPrometheusContract(prom),
)
indexer.tokens = service.NewTokenService(
db, indexer.resolveTokenMetadata, network,
service.WithMaxRetryCountToken(settings.MaxRetryCountOnError),
service.WithWorkersCountToken(settings.TokenServiceWorkers),
service.WithPrometheusToken(prom),
)

return indexer, nil
Expand All @@ -111,6 +113,26 @@ func (indexer *Indexer) Start(ctx context.Context) error {
indexer.thumbnail.Start(ctx)
}

if indexer.prom != nil {
newContractCount, err := indexer.db.CountContractsByStatus(indexer.network, models.StatusNew)
if err != nil {
return err
}
indexer.prom.SetGaugeValue(metricMetadataNew, map[string]string{
"network": indexer.network,
"type": "contract",
}, float64(newContractCount))

newTokenCount, err := indexer.db.CountTokensByStatus(indexer.network, models.StatusNew)
if err != nil {
return err
}
indexer.prom.SetGaugeValue(metricMetadataNew, map[string]string{
"network": indexer.network,
"type": "token",
}, float64(newTokenCount))
}

indexer.contracts.Start(ctx)
indexer.tokens.Start(ctx)

Expand Down Expand Up @@ -294,3 +316,23 @@ func (indexer *Indexer) addHistogramResponseTime(data resolver.Resolved) {
"node": data.Node,
}, float64(len(data.Data))/float64(data.ResponseTime))
}

func (indexer *Indexer) incrementNewMetadataGauge(typ string) {
if indexer.prom == nil {
return
}
indexer.prom.IncGaugeValue(metricMetadataNew, map[string]string{
"network": indexer.network,
"type": typ,
})
}

func (indexer *Indexer) decrementNewMetadataGauge(typ string) {
if indexer.prom == nil {
return
}
indexer.prom.DecGaugeValue(metricMetadataNew, map[string]string{
"network": indexer.network,
"type": typ,
})
}
2 changes: 2 additions & 0 deletions cmd/metadata/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (

const (
metricMetadataCounter = "metadata_counter"
metricMetadataNew = "metadata_new"
metricsMetadataHttpErrors = "metadata_http_errors"
metricsMetadataMimeType = "metadata_mime_type"
metricsMetadataIPFSResponseTime = "metadata_ipfs_response_time"
Expand Down Expand Up @@ -134,6 +135,7 @@ func initPrometheus(cfg *golibConfig.Prometheus) *prometheus.Service {
prometheusService := prometheus.NewService(cfg)

prometheusService.RegisterGoBuildMetrics()
prometheusService.RegisterGauge(metricMetadataNew, "Count of new metadata", "type", "network")
prometheusService.RegisterCounter(metricMetadataCounter, "Count of metadata", "type", "status", "network")
prometheusService.RegisterCounter(metricsMetadataHttpErrors, "Count of HTTP errors in metadata", "network", "code", "type")
prometheusService.RegisterCounter(metricsMetadataMimeType, "Count of metadata mime types", "network", "mime")
Expand Down
1 change: 1 addition & 0 deletions cmd/metadata/models/contract_metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,4 +46,5 @@ type ContractRepository interface {
UpdateContractMetadata(ctx context.Context, metadata []*ContractMetadata) error
SaveContractMetadata(ctx context.Context, metadata []*ContractMetadata) error
LastContractUpdateID() (int64, error)
CountContractsByStatus(network string, status Status) (int, error)
}
10 changes: 10 additions & 0 deletions cmd/metadata/models/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,11 @@ func (db *RelativeDatabase) LastContractUpdateID() (updateID int64, err error) {
return
}

// CountContractsByStatus -
func (db *RelativeDatabase) CountContractsByStatus(network string, status Status) (int, error) {
return db.DB().Model(&ContractMetadata{}).Where("status = ?", status).Where("network = ?", network).Count()
}

// GetTokenMetadata -
func (db *RelativeDatabase) GetTokenMetadata(network string, status Status, limit, offset, retryCount int) (all []TokenMetadata, err error) {
query := db.DB().Model(&all).Where("status = ?", status).Where("network = ?", network)
Expand Down Expand Up @@ -171,6 +176,11 @@ func (db *RelativeDatabase) GetUnprocessedImage(from uint64, limit int) (all []T
return
}

// CountTokensByStatus -
func (db *RelativeDatabase) CountTokensByStatus(network string, status Status) (int, error) {
return db.DB().Model(&TokenMetadata{}).Where("status = ?", status).Where("network = ?", network).Count()
}

// CurrentContext -
func (db *RelativeDatabase) CurrentContext() (updates []ContextItem, err error) {
err = db.DB().Model(&updates).Select()
Expand Down
12 changes: 12 additions & 0 deletions cmd/metadata/models/elastic.go
Original file line number Diff line number Diff line change
Expand Up @@ -228,6 +228,12 @@ func (e *Elastic) LastContractUpdateID() (value int64, err error) {
return
}

// CountContractsByStatus -
func (e *Elastic) CountContractsByStatus(network string, status Status) (int, error) {
// TODO: realize CountContractsByStatus
return 0, nil
}

// GetContractMetadata -
// TODO: filter network
func (e *Elastic) GetTokenMetadata(network string, status Status, limit, offset, retryCount int) ([]TokenMetadata, error) {
Expand Down Expand Up @@ -339,6 +345,12 @@ func (e *Elastic) LastTokenUpdateID() (value int64, err error) {
return
}

// CountTokensByStatus -
func (e *Elastic) CountTokensByStatus(network string, status Status) (int, error) {
// TODO: realize CountTokensByStatus
return 0, nil
}

// SetImageProcessed -
func (e *Elastic) SetImageProcessed(token TokenMetadata) error {
token.UpdatedAt = time.Now().Unix()
Expand Down
1 change: 1 addition & 0 deletions cmd/metadata/models/token_metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,4 +52,5 @@ type TokenRepository interface {
UpdateTokenMetadata(ctx context.Context, metadata []*TokenMetadata) error
SaveTokenMetadata(ctx context.Context, metadata []*TokenMetadata) error
LastTokenUpdateID() (int64, error)
CountTokensByStatus(network string, status Status) (int, error)
}
17 changes: 17 additions & 0 deletions cmd/metadata/service/contract.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"sync"
"time"

"github.com/dipdup-net/go-lib/prometheus"
"github.com/dipdup-net/metadata/cmd/metadata/models"
"github.com/dipdup-net/metadata/internal/ipfs"
"github.com/go-pg/pg/v10"
Expand All @@ -19,6 +20,7 @@ type ContractService struct {
workersCount int
db models.Database
handler func(ctx context.Context, contract *models.ContractMetadata) error
prom *prometheus.Service
tasks chan *models.ContractMetadata
result chan *models.ContractMetadata
wg sync.WaitGroup
Expand Down Expand Up @@ -138,6 +140,21 @@ func (s *ContractService) saver(ctx context.Context) {
case contract := <-s.result:
contracts = append(contracts, contract)

if s.prom != nil {
switch contract.Status {
case models.StatusApplied, models.StatusFailed:
s.prom.DecGaugeValue("metadata_new", map[string]string{
"network": s.network,
"type": "contract",
})
s.prom.IncrementCounter("metadata_counter", map[string]string{
"network": s.network,
"type": "contract",
"status": contract.Status.String(),
})
}
}

if len(contracts) == 8 {
if err := s.db.UpdateContractMetadata(ctx, contracts); err != nil {
log.Err(err).Msg("UpdateContractMetadata")
Expand Down
16 changes: 16 additions & 0 deletions cmd/metadata/service/options.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package service

import "github.com/dipdup-net/go-lib/prometheus"

// ContractServiceOption -
type ContractServiceOption func(*ContractService)

Expand All @@ -21,6 +23,13 @@ func WithWorkersCountContract(count int) ContractServiceOption {
}
}

// WithWorkersCountContract -
func WithPrometheusContract(prom *prometheus.Service) ContractServiceOption {
return func(cs *ContractService) {
cs.prom = prom
}
}

// TokenServiceOption -
type TokenServiceOption func(*TokenService)

Expand All @@ -41,3 +50,10 @@ func WithWorkersCountToken(count int) TokenServiceOption {
}
}
}

// WithPrometheusToken -
func WithPrometheusToken(prom *prometheus.Service) TokenServiceOption {
return func(ts *TokenService) {
ts.prom = prom
}
}
17 changes: 17 additions & 0 deletions cmd/metadata/service/token.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"sync"
"time"

"github.com/dipdup-net/go-lib/prometheus"
"github.com/dipdup-net/metadata/cmd/metadata/models"
"github.com/dipdup-net/metadata/internal/ipfs"
"github.com/go-pg/pg/v10"
Expand All @@ -19,6 +20,7 @@ type TokenService struct {
workersCount int
repo models.Database
handler func(ctx context.Context, token *models.TokenMetadata) error
prom *prometheus.Service
tasks chan *models.TokenMetadata
result chan *models.TokenMetadata
wg sync.WaitGroup
Expand Down Expand Up @@ -127,6 +129,21 @@ func (s *TokenService) saver(ctx context.Context) {
case token := <-s.result:
tokens = append(tokens, token)

if s.prom != nil {
switch token.Status {
case models.StatusApplied, models.StatusFailed:
s.prom.DecGaugeValue("metadata_new", map[string]string{
"network": s.network,
"type": "token",
})
s.prom.IncrementCounter("metadata_counter", map[string]string{
"network": s.network,
"type": "token",
"status": token.Status.String(),
})
}
}

if len(tokens) == 8 {
if err := s.repo.UpdateTokenMetadata(ctx, tokens); err != nil {
log.Err(err).Msg("UpdateTokenMetadata")
Expand Down
7 changes: 4 additions & 3 deletions cmd/metadata/token.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,14 +101,16 @@ func (indexer *Indexer) processTokenMetadata(update api.BigMapUpdate) (*models.T
UpdateID: indexer.tokenActionsCounter.Increment(),
}

indexer.incrementNewMetadataGauge("token")

if _, err := url.ParseRequestURI(tokenInfo.Link); err != nil {
token.Status = models.StatusApplied
indexer.incrementCounter("token", token.Status)
indexer.decrementNewMetadataGauge("token")
} else {
token.Link = tokenInfo.Link
}

indexer.incrementCounter("token", token.Status)

return &token, nil
}

Expand Down Expand Up @@ -155,7 +157,6 @@ func (indexer *Indexer) resolveTokenMetadata(ctx context.Context, tm *models.Tok
}
}

indexer.incrementCounter("token", tm.Status)
tm.UpdateID = indexer.tokenActionsCounter.Increment()

if resolved.By == resolver.ResolverTypeIPFS && tm.Status == models.StatusApplied {
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ require (
github.com/aws/aws-sdk-go v1.42.20
github.com/btcsuite/btcd v0.22.0-beta // indirect
github.com/cenkalti/backoff v2.2.1+incompatible
github.com/dipdup-net/go-lib v0.1.50
github.com/dipdup-net/go-lib v0.1.51
github.com/disintegration/imaging v1.6.2
github.com/elastic/go-elasticsearch/v8 v8.0.0-20211123103400-5f8a17a2322f
github.com/go-pg/pg/v10 v10.10.6
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,8 @@ github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSs
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/decred/dcrd/lru v1.0.0/go.mod h1:mxKOwFd7lFjN2GZYsiz/ecgqR6kkYAl+0pz0tEMk218=
github.com/dipdup-net/go-lib v0.1.50 h1:NNPyP7NNMbnXVhfuQiNigm5WiyKtkpwsRxNnWQDfcxc=
github.com/dipdup-net/go-lib v0.1.50/go.mod h1:ZOAyo2sqNLCCVdvHvPbiWfgImvmrMW7/sdQiSsGrD44=
github.com/dipdup-net/go-lib v0.1.51 h1:SjyaVoivUuNEFNPX+URZJ/xVJFCmiJl/eXLE8GNPRT8=
github.com/dipdup-net/go-lib v0.1.51/go.mod h1:ZOAyo2sqNLCCVdvHvPbiWfgImvmrMW7/sdQiSsGrD44=
github.com/disintegration/imaging v1.6.2 h1:w1LecBlG2Lnp8B3jk5zSuNqd7b4DXhcjwek1ei82L+c=
github.com/disintegration/imaging v1.6.2/go.mod h1:44/5580QXChDfwIclfc/PCwrr44amcmDAg8hxG0Ewe4=
github.com/ebellocchia/go-base58 v0.1.0/go.mod h1:RHE/6C6Ru6YAH9Tc+A9eHQ6ZKEooLC0jw+YLnpt3CAU=
Expand Down

0 comments on commit 4255707

Please sign in to comment.