Skip to content

Commit

Permalink
Gracefull stop and beutify logs
Browse files Browse the repository at this point in the history
  • Loading branch information
aopoltorzhicky committed Dec 14, 2022
1 parent dbf0864 commit dfd5c80
Show file tree
Hide file tree
Showing 8 changed files with 56 additions and 59 deletions.
22 changes: 9 additions & 13 deletions cmd/metadata/contract.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"github.com/dipdup-net/metadata/cmd/metadata/helpers"
"github.com/dipdup-net/metadata/cmd/metadata/models"
"github.com/dipdup-net/metadata/cmd/metadata/resolver"
"github.com/pkg/errors"
)

func (indexer *Indexer) processContractMetadata(update api.BigMapUpdate) (*models.ContractMetadata, error) {
Expand All @@ -32,35 +33,30 @@ func (indexer *Indexer) processContractMetadata(update api.BigMapUpdate) (*model
}, nil
}

func (indexer *Indexer) logContractMetadata(cm models.ContractMetadata, str, level string) {
entry := indexer.log().Str("contract", cm.Contract).Str("link", cm.Link)
switch level {
case "info":
entry.Msg(str)
case "warn":
entry.Msg(str)
case "error":
entry.Msg(str)
}
func (indexer *Indexer) logContractMetadata(cm models.ContractMetadata, str string) {
indexer.log().Str("contract", cm.Contract).Str("link", cm.Link).Msg(str)
}

func (indexer *Indexer) resolveContractMetadata(ctx context.Context, cm *models.ContractMetadata) error {
indexer.logContractMetadata(*cm, "Trying to resolve", "info")
indexer.logContractMetadata(*cm, "trying to resolve")
cm.RetryCount += 1

resolved, err := indexer.resolver.Resolve(ctx, cm.Network, cm.Contract, cm.Link)
if err != nil {
if errors.Is(err, context.Canceled) {
return err
}
cm.Error = err.Error()
if e, ok := err.(resolver.ResolvingError); ok {
indexer.prom.IncrementErrorCounter(indexer.network, e)
err = e.Err
}

if cm.RetryCount < int8(indexer.settings.MaxRetryCountOnError) {
indexer.logContractMetadata(*cm, fmt.Sprintf("Retry: %s", err.Error()), "warn")
indexer.logContractMetadata(*cm, fmt.Sprintf("retry: %s", err.Error()))
} else {
cm.Status = models.StatusFailed
indexer.logContractMetadata(*cm, "Failed", "warn")
indexer.logContractMetadata(*cm, "failed")
}
} else {
cm.Metadata = resolved.Data
Expand Down
7 changes: 5 additions & 2 deletions cmd/metadata/indexer.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ type Indexer struct {
settings config.Settings
filters config.Filters

wg sync.WaitGroup
wg *sync.WaitGroup
}

// NewIndexer -
Expand Down Expand Up @@ -73,6 +73,7 @@ func NewIndexer(ctx context.Context, network string, indexerConfig *config.Index
db: db,
prom: prom,
filters: filters,
wg: new(sync.WaitGroup),
}

if aws := storage.NewAWS(settings.AWS); aws != nil {
Expand Down Expand Up @@ -154,6 +155,7 @@ func (indexer *Indexer) Start(ctx context.Context) error {

// Close -
func (indexer *Indexer) Close() error {
indexer.log().Msg("closing indexer...")
indexer.wg.Wait()

if err := indexer.scanner.Close(); err != nil {
Expand All @@ -178,6 +180,7 @@ func (indexer *Indexer) Close() error {
return err
}

indexer.log().Msg("indexer is closed")
return nil
}

Expand Down Expand Up @@ -239,7 +242,7 @@ func (indexer *Indexer) listen(ctx context.Context) {
indexer.log().Msg("New level")
}
case block := <-indexer.scanner.Blocks():
if block.Level-indexer.state.Level > 1 {
if block.Level > indexer.state.Level {
indexer.state.Level = block.Level
indexer.state.Hash = block.Hash
indexer.state.Timestamp = block.Timestamp.UTC()
Expand Down
4 changes: 2 additions & 2 deletions cmd/metadata/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ func main() {
go func(network string, ind *config.Indexer) {
result, err := startIndexer(ctx, cfg, *ind, network, prometheusService, ipfsNode, views, custom_configs, &hasuraInit)
if err != nil {
log.Err(err).Msg("")
log.Err(err).Str("network", network).Msg("startIndexer")
} else {
indexers.Store(network, result.indexer)
indexerCancels.Store(network, result.cancel)
Expand All @@ -122,7 +122,7 @@ func main() {
case <-ticker.C:
result, err := startIndexer(ctx, cfg, *ind, network, prometheusService, ipfsNode, views, custom_configs, &hasuraInit)
if err != nil {
log.Err(err).Msg("")
log.Err(err).Str("network", network).Msg("startIndexer")
} else {
indexers.Store(network, result.indexer)
indexerCancels.Store(network, result.cancel)
Expand Down
3 changes: 3 additions & 0 deletions cmd/metadata/resolver/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,9 @@ func (s Http) Resolve(ctx context.Context, network, address, link string) ([]byt

resp, err := s.client.Do(req)
if err != nil {
if errors.Is(err, context.Canceled) {
return nil, err
}
return nil, newResolvingError(0, ErrorTypeReceiving, errors.Wrap(ErrHTTPRequest, err.Error()))
}
defer resp.Body.Close()
Expand Down
4 changes: 4 additions & 0 deletions cmd/metadata/resolver/ipfs.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package resolver

import (
"context"
"errors"
"strings"
"time"

Expand Down Expand Up @@ -83,6 +84,9 @@ func (s Ipfs) Resolve(ctx context.Context, network, address, link string) (ipfs.

data, err := s.pool.GetFromRandomGateway(requestCtx, link)
if err != nil {
if errors.Is(err, context.Canceled) {
return data, err
}
if s.fallback == "" {
return data, newResolvingError(0, ErrorTypeHttpRequest, err)
}
Expand Down
11 changes: 9 additions & 2 deletions cmd/metadata/service/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (

"github.com/dipdup-net/metadata/cmd/metadata/models"
"github.com/dipdup-net/metadata/cmd/metadata/prometheus"
"github.com/pkg/errors"
"github.com/rs/zerolog/log"
)

Expand All @@ -24,7 +25,7 @@ type Service[T models.Model] struct {
tasks chan T
result chan T
queue *Queue
wg sync.WaitGroup
wg *sync.WaitGroup
}

// NewService -
Expand All @@ -39,6 +40,7 @@ func NewService[T models.Model](repo models.ModelRepository[T], handler func(con
network: network,
queue: NewQueue(),
delay: 10,
wg: new(sync.WaitGroup),
}

for i := range opts {
Expand Down Expand Up @@ -97,7 +99,9 @@ func (s *Service[T]) manager(ctx context.Context) {
}
data, err := s.repo.Get(s.network, models.StatusNew, 200, 0, s.maxRetryCount, s.delay)
if err != nil {
log.Err(err).Msg("repo.Get")
if !errors.Is(err, context.Canceled) {
log.Err(err).Msg("repo.Get")
}
continue
}

Expand Down Expand Up @@ -192,6 +196,9 @@ func (s *Service[T]) worker(ctx context.Context) {
defer cancel()

if err := s.handler(resolveCtx, unresolved); err != nil {
if errors.Is(err, context.Canceled) {
return
}
log.Err(err).Msg("resolve")
}

Expand Down
24 changes: 10 additions & 14 deletions cmd/metadata/token.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"unicode/utf8"

jsoniter "github.com/json-iterator/go"
"github.com/pkg/errors"
"github.com/shopspring/decimal"

api "github.com/dipdup-net/go-lib/tzkt/data"
Expand Down Expand Up @@ -113,29 +114,24 @@ func (indexer *Indexer) processTokenMetadata(update api.BigMapUpdate) (*models.T
return &token, nil
}

func (indexer *Indexer) logTokenMetadata(tm models.TokenMetadata, str, level string) {
entry := indexer.log().Str("contract", tm.Contract).Str("token_id", tm.TokenID.String()).Str("link", tm.Link)
switch level {
case "info":
entry.Msg(str)
case "warn":
entry.Msg(str)
case "error":
entry.Msg(str)
}
func (indexer *Indexer) logTokenMetadata(tm models.TokenMetadata, str string) {
indexer.log().Str("contract", tm.Contract).Str("token_id", tm.TokenID.String()).Str("link", tm.Link).Msg(str)
}

func (indexer *Indexer) resolveTokenMetadata(ctx context.Context, tm *models.TokenMetadata) error {
if _, ok := legacyContracts[tm.Contract]; ok {
indexer.logTokenMetadata(*tm, "readonly legacy metadata", "info")
indexer.logTokenMetadata(*tm, "readonly legacy metadata")
return nil
}

indexer.logTokenMetadata(*tm, "trying to resolve", "info")
indexer.logTokenMetadata(*tm, "trying to resolve")
tm.RetryCount += 1

resolved, err := indexer.resolver.Resolve(ctx, tm.Network, tm.Contract, tm.Link)
if err != nil {
if errors.Is(err, context.Canceled) {
return err
}
tm.Error = err.Error()
if e, ok := err.(resolver.ResolvingError); ok {
indexer.prom.IncrementErrorCounter(indexer.network, e)
Expand All @@ -147,10 +143,10 @@ func (indexer *Indexer) resolveTokenMetadata(ctx context.Context, tm *models.Tok
}

if tm.RetryCount < int8(indexer.settings.MaxRetryCountOnError) {
indexer.logTokenMetadata(*tm, fmt.Sprintf("retry: %s", err.Error()), "warn")
indexer.logTokenMetadata(*tm, fmt.Sprintf("retry: %s", err.Error()))
} else {
tm.Status = models.StatusFailed
indexer.logTokenMetadata(*tm, "failed", "warn")
indexer.logTokenMetadata(*tm, "failed")
}
} else {
if utf8.Valid(resolved.Data) {
Expand Down
40 changes: 14 additions & 26 deletions cmd/metadata/tzkt/scanner.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,9 @@ type Scanner struct {
msg Message
contracts []string

diffs chan Message
blocks chan data.Block
wg sync.WaitGroup
initOnce sync.Once
diffs chan Message
blocks chan data.Block
wg *sync.WaitGroup
}

// New -
Expand All @@ -50,6 +49,7 @@ func New(cfg config.DataSource, contracts ...string) (*Scanner, error) {
contracts: contracts,
diffs: make(chan Message, 1024),
blocks: make(chan data.Block, 10),
wg: new(sync.WaitGroup),
}, nil
}

Expand All @@ -59,21 +59,18 @@ func (scanner *Scanner) Start(ctx context.Context, startLevel, endLevel uint64)
return
}

scanner.initOnce.Do(func() {
scanner.wg.Add(1)
go scanner.synchronization(ctx, startLevel, endLevel)
})

scanner.wg.Add(1)
go scanner.synchronization(ctx, startLevel, endLevel)
}

func (scanner *Scanner) start(ctx context.Context) {
if err := scanner.client.Connect(ctx); err != nil {
log.Err(err).Msg("")
log.Err(err).Msg("Connect")
return
}

if err := scanner.subscribe(); err != nil {
log.Err(err).Msg("")
log.Err(err).Msg("subscribe")
return
}

Expand All @@ -86,7 +83,7 @@ func (scanner *Scanner) synchronization(ctx context.Context, startLevel, endLeve

head, err := scanner.api.GetHead(ctx)
if err != nil {
log.Err(err).Msg("")
log.Err(err).Msg("GetHead")
return
}
log.Info().Msgf("Current TzKT head is %d. Indexer state is %d.", head.Level, startLevel)
Expand All @@ -108,14 +105,12 @@ func (scanner *Scanner) synchronization(ctx context.Context, startLevel, endLeve
}

if err := scanner.sync(ctx, head.Level); err != nil {
log.Err(err).Msg("")
return
log.Err(err).Msg("sync")
}

head, err = scanner.api.GetHead(ctx)
if err != nil {
log.Err(err).Msg("")
return
log.Err(err).Msg("GetHead")
}
}
}
Expand Down Expand Up @@ -176,23 +171,16 @@ func (scanner *Scanner) listen(ctx context.Context) {
case msg := <-scanner.client.Listen():
switch msg.Type {
case events.MessageTypeState:
if scanner.level < msg.State {
if err := scanner.client.Close(); err != nil {
log.Err(err).Msg("scanner.client.Close")
}
scanner.synchronization(ctx, scanner.level, 0)
return
}

case events.MessageTypeData:
switch msg.Channel {
case events.ChannelBlocks:
if err := scanner.handleBlocks(msg); err != nil {
log.Err(err).Msg("")
log.Err(err).Msg("handleBlocks")
}
case events.ChannelBigMap:
if err := scanner.handleBigMaps(msg); err != nil {
log.Err(err).Msg("")
log.Err(err).Msg("handleBigMaps")
}
default:
log.Error().Msgf("Unknown channel %s", msg.Channel)
Expand All @@ -207,7 +195,7 @@ func (scanner *Scanner) sync(ctx context.Context, headLevel uint64) error {
for {
select {
case <-ctx.Done():
return ctx.Err()
return nil
default:
if headLevel <= scanner.level {
if scanner.msg.Level > 0 {
Expand Down

0 comments on commit dfd5c80

Please sign in to comment.