Skip to content

Commit

Permalink
Merge pull request #6 from dipdup-net/refactoring/change-orm
Browse files Browse the repository at this point in the history
Refactoring: change ORM, logging library and some optmization features
  • Loading branch information
aopoltorzhicky authored Dec 9, 2021
2 parents 87cdaa9 + e7b6cce commit 1cab313
Show file tree
Hide file tree
Showing 21 changed files with 691 additions and 735 deletions.
18 changes: 9 additions & 9 deletions cmd/api/main.go
Original file line number Diff line number Diff line change
@@ -1,23 +1,23 @@
package main

import (
"os"
"time"

"github.com/cenkalti/backoff"
"github.com/dipdup-net/go-lib/cmdline"
"github.com/dipdup-net/go-lib/config"
"github.com/elastic/go-elasticsearch/v8"
"github.com/labstack/echo"
"github.com/labstack/echo/middleware"
log "github.com/sirupsen/logrus"
"github.com/labstack/echo/v4"
"github.com/labstack/echo/v4/middleware"
"github.com/rs/zerolog"
"github.com/rs/zerolog/log"
)

var es *elasticsearch.Client

func main() {
log.SetFormatter(&log.TextFormatter{
FullTimestamp: true,
})
log.Logger = log.Output(zerolog.ConsoleWriter{Out: os.Stderr})

args := cmdline.Parse()
if args.Help {
Expand All @@ -26,18 +26,18 @@ func main() {

cfg, err := config.Load(args.Config)
if err != nil {
log.Error(err)
log.Err(err).Msg("")
return
}

if cfg.Database.Kind != "elastic" {
log.Errorf("Invalid database kind: want=elastic got=%s", cfg.Database.Kind)
log.Error().Msgf("Invalid database kind: want=elastic got=%s", cfg.Database.Kind)
return
}

elastic, err := createElastic(cfg.Database.Path)
if err != nil {
log.Error(err)
log.Err(err).Msg("")
return
}
es = elastic
Expand Down
2 changes: 1 addition & 1 deletion cmd/api/search.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import (

"log"

"github.com/labstack/echo"
"github.com/labstack/echo/v4"
"github.com/pkg/errors"
)

Expand Down
48 changes: 25 additions & 23 deletions cmd/metadata/contract.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
package main

import (
"bytes"
"context"
stdJSON "encoding/json"
"fmt"
"time"
"unicode/utf8"

"github.com/dipdup-net/go-lib/tzkt/api"
"github.com/dipdup-net/metadata/cmd/metadata/helpers"
Expand Down Expand Up @@ -38,25 +40,25 @@ func (indexer *Indexer) processContractMetadata(update api.BigMapUpdate) (*model
}

func (indexer *Indexer) logContractMetadata(cm models.ContractMetadata, str, level string) {
entry := indexer.log().WithField("contract", cm.Contract).WithField("link", cm.Link)
entry := indexer.log().Str("contract", cm.Contract).Str("link", cm.Link)
switch level {
case "info":
entry.Info(str)
entry.Msg(str)
case "warn":
entry.Warn(str)
entry.Msg(str)
case "error":
entry.Error(str)
entry.Msg(str)
}
}

func (indexer *Indexer) resolveContractMetadata(ctx context.Context, cm *models.ContractMetadata) {
func (indexer *Indexer) resolveContractMetadata(ctx context.Context, cm *models.ContractMetadata) error {
indexer.logContractMetadata(*cm, "Trying to resolve", "info")
data, err := indexer.resolver.Resolve(ctx, cm.Network, cm.Contract, cm.Link)
if err != nil {
switch {
case errors.Is(err, resolver.ErrNoIPFSResponse) || errors.Is(err, resolver.ErrTezosStorageKeyNotFound):
cm.RetryCount += 1
if cm.RetryCount < int(indexer.settings.MaxRetryCountOnError) {
if cm.RetryCount < int8(indexer.settings.MaxRetryCountOnError) {
indexer.logContractMetadata(*cm, fmt.Sprintf("Retry: %s", err.Error()), "warn")
} else {
cm.Status = models.StatusFailed
Expand All @@ -71,23 +73,23 @@ func (indexer *Indexer) resolveContractMetadata(ctx context.Context, cm *models.
indexer.incrementErrorCounter(e)
}
} else {
cm.Metadata = helpers.Escape(data)
cm.Status = models.StatusApplied
escaped := helpers.Escape(data)

if utf8.Valid(escaped) {
cm.Status = models.StatusApplied

var dst bytes.Buffer
if err := stdJSON.Compact(&dst, escaped); err != nil {
cm.Metadata = escaped
} else {
cm.Metadata = dst.Bytes()
}
} else {
cm.Metadata = escaped
cm.Status = models.StatusFailed
}
}
cm.UpdateID = indexer.contractActionsCounter.Increment()
indexer.incrementCounter("contract", cm.Status)
}

func (indexer *Indexer) contractWorker(ctx context.Context, contract models.ContractMetadata) error {
resolveCtx, cancel := context.WithTimeout(ctx, 20*time.Second)
defer cancel()

indexer.resolveContractMetadata(resolveCtx, &contract)

return indexer.db.UpdateContractMetadata(&contract, map[string]interface{}{
"status": contract.Status,
"metadata": contract.Metadata,
"retry_count": contract.RetryCount,
"update_id": contract.UpdateID,
})
return nil
}
40 changes: 21 additions & 19 deletions cmd/metadata/indexer.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,15 @@ import (
"strings"
"sync"

"github.com/go-pg/pg/v10"
"github.com/pkg/errors"
log "github.com/sirupsen/logrus"
"gorm.io/gorm"

"github.com/rs/zerolog"
"github.com/rs/zerolog/log"

generalConfig "github.com/dipdup-net/go-lib/config"
"github.com/dipdup-net/go-lib/database"
"github.com/dipdup-net/go-lib/prometheus"
"github.com/dipdup-net/go-lib/state"
"github.com/dipdup-net/metadata/cmd/metadata/config"
internalContext "github.com/dipdup-net/metadata/cmd/metadata/context"
"github.com/dipdup-net/metadata/cmd/metadata/helpers"
Expand All @@ -28,7 +30,7 @@ import (
type Indexer struct {
network string
indexName string
state state.State
state database.State
resolver resolver.Receiver
db models.Database
scanner *tzkt.Scanner
Expand Down Expand Up @@ -69,8 +71,8 @@ func NewIndexer(ctx context.Context, network string, indexerConfig *config.Index
if aws := storage.NewAWS(settings.AWS.AccessKey, settings.AWS.Secret, settings.AWS.Region, settings.AWS.BucketName); aws != nil {
indexer.thumbnail = thumbnail.New(aws, db, prom, network, settings.IPFSGateways, 10)
}
indexer.contracts = service.NewContractService(db, indexer.contractWorker)
indexer.tokens = service.NewTokenService(db, indexer.tokenWorker)
indexer.contracts = service.NewContractService(db, indexer.resolveContractMetadata)
indexer.tokens = service.NewTokenService(db, indexer.resolveTokenMetadata)

return indexer, nil
}
Expand Down Expand Up @@ -134,12 +136,12 @@ func (indexer *Indexer) Close() error {
}

func (indexer *Indexer) initState() error {
current, err := indexer.db.GetState(indexer.indexName)
current, err := indexer.db.State(indexer.indexName)
if err != nil {
if !errors.Is(err, gorm.ErrRecordNotFound) {
if !errors.Is(err, pg.ErrNoRows) {
return err
}
indexer.state = state.State{
indexer.state = database.State{
IndexType: models.IndexTypeMetadata,
IndexName: indexer.indexName,
}
Expand Down Expand Up @@ -169,8 +171,8 @@ func (indexer *Indexer) initCounters() error {
return nil
}

func (indexer *Indexer) log() *log.Entry {
return log.WithField("state", indexer.state.Level).WithField("name", indexer.indexName)
func (indexer *Indexer) log() *zerolog.Event {
return log.Info().Uint64("state", indexer.state.Level).Str("name", indexer.indexName)
}

func (indexer *Indexer) listen(ctx context.Context) {
Expand All @@ -181,25 +183,25 @@ func (indexer *Indexer) listen(ctx context.Context) {
case <-ctx.Done():
return
case msg := <-indexer.scanner.BigMaps():
if err := indexer.handlerUpdate(msg); err != nil {
log.Error(err)
if err := indexer.handlerUpdate(ctx, msg); err != nil {
log.Err(err)
} else {
indexer.log().Infof("New level %d", msg.Level)
indexer.log().Msg("New level")
}
case level := <-indexer.scanner.Blocks():
if level-indexer.state.Level > 1 {
indexer.state.Level = level - 1
if err := indexer.db.UpdateState(indexer.state); err != nil {
log.Error(err)
log.Err(err).Msg("")
} else {
indexer.log().Infof("New level %d", indexer.state.Level)
indexer.log().Msg("New level")
}
}
}
}
}

func (indexer *Indexer) handlerUpdate(msg tzkt.Message) error {
func (indexer *Indexer) handlerUpdate(ctx context.Context, msg tzkt.Message) error {
tokens := make([]*models.TokenMetadata, 0)
contracts := make([]*models.ContractMetadata, 0)
for i := range msg.Body {
Expand All @@ -225,10 +227,10 @@ func (indexer *Indexer) handlerUpdate(msg tzkt.Message) error {
}
}

if err := indexer.db.SaveContractMetadata(contracts); err != nil {
if err := indexer.db.SaveContractMetadata(ctx, contracts); err != nil {
return err
}
if err := indexer.db.SaveTokenMetadata(tokens); err != nil {
if err := indexer.db.SaveTokenMetadata(ctx, tokens); err != nil {
return err
}

Expand Down
28 changes: 15 additions & 13 deletions cmd/metadata/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,8 @@ import (
"syscall"
"time"

log "github.com/sirupsen/logrus"
"github.com/rs/zerolog"
"github.com/rs/zerolog/log"

"github.com/dipdup-net/go-lib/cmdline"
golibConfig "github.com/dipdup-net/go-lib/config"
Expand All @@ -31,9 +32,10 @@ type startResult struct {
}

func main() {
log.SetFormatter(&log.TextFormatter{
FullTimestamp: true,
})
log.Logger = log.Output(zerolog.ConsoleWriter{
Out: os.Stderr,
TimeFormat: "2006-01-02 15:04:05",
}).Level(zerolog.InfoLevel)

args := cmdline.Parse()
if args.Help {
Expand All @@ -42,7 +44,7 @@ func main() {

cfg, err := config.Load(args.Config)
if err != nil {
log.Error(err)
log.Err(err).Msg("")
return
}
runtime.GOMAXPROCS(int(cfg.Metadata.Settings.MaxCPU))
Expand All @@ -63,13 +65,13 @@ func main() {
go func(network string, ind *config.Indexer) {
result, err := startIndexer(ctx, cfg, *ind, network, prometheusService)
if err != nil {
log.Error(err)
log.Err(err).Msg("")
} else {
indexers[network] = result.indexer
indexerCancels[network] = result.cancel
hasuraInit.Do(func() {
if err := hasura.Create(ctx, cfg.Hasura, cfg.Database, nil, new(models.TokenMetadata), new(models.ContractMetadata)); err != nil {
log.Error(err)
log.Err(err).Msg("")
}
})
return
Expand All @@ -85,13 +87,13 @@ func main() {
case <-ticker.C:
result, err := startIndexer(ctx, cfg, *ind, network, prometheusService)
if err != nil {
log.Error(err)
log.Err(err).Msg("")
} else {
indexers[network] = result.indexer
indexerCancels[network] = result.cancel
hasuraInit.Do(func() {
if err := hasura.Create(ctx, cfg.Hasura, cfg.Database, nil, new(models.TokenMetadata), new(models.ContractMetadata)); err != nil {
log.Error(err)
log.Err(err).Msg("")
}
})
return
Expand All @@ -104,21 +106,21 @@ func main() {
<-signals

for newtork, cancelIndexer := range indexerCancels {
log.Infof("stopping %s indexer...", newtork)
log.Info().Msgf("stopping %s indexer...", newtork)
cancelIndexer()
}

log.Warn("Trying carefully stopping....")
log.Warn().Msgf("Trying carefully stopping....")
for _, indexer := range indexers {
if err := indexer.Close(); err != nil {
log.Error(err)
log.Err(err).Msg("")
return
}
}

if prometheusService != nil {
if err := prometheusService.Close(); err != nil {
log.Error(err)
log.Err(err).Msg("")
}
}

Expand Down
11 changes: 7 additions & 4 deletions cmd/metadata/models/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,13 @@ const (

// ContextItem -
type ContextItem struct {
ID uint64 `gorm:"autoIncrement;not null;" json:"-"`
Network string `gorm:"primarykey"`
Address string `gorm:"primarykey"`
Key string `gorm:"primarykey"`
//nolint
tableName struct{} `pg:"dipdup_metadata_context"`

ID uint64 `gorm:"autoIncrement;not null;" json:"-" pg:",nopk,notnull"`
Network string `gorm:"primarykey" pg:",pk"`
Address string `gorm:"primarykey" pg:",pk"`
Key string `gorm:"primarykey" pg:",pk"`
Value []byte
}

Expand Down
Loading

0 comments on commit 1cab313

Please sign in to comment.