Skip to content

Commit

Permalink
Fetching metadata with workers
Browse files Browse the repository at this point in the history
  • Loading branch information
aopoltorzhicky committed Dec 1, 2021
1 parent b10845d commit 039b7fb
Show file tree
Hide file tree
Showing 9 changed files with 181 additions and 130 deletions.
4 changes: 4 additions & 0 deletions cmd/metadata/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@ type Settings struct {
MaxRetryCountOnError uint64 `yaml:"max_retry_count_on_error"`
Index []string `yaml:"index"`
AWS AWS `yaml:"aws"`
MaxCPU uint64 `yaml:"max_cpu,omitempty"`
}

// Validate -
Expand All @@ -124,6 +125,9 @@ func (cfg *Settings) Validate() error {
if cfg.HTTPTimeout == 0 {
cfg.HTTPTimeout = 10
}
if cfg.MaxCPU == 0 {
cfg.MaxCPU = 4
}
if cfg.MaxRetryCountOnError == 0 {
cfg.MaxRetryCountOnError = 3
}
Expand Down
29 changes: 10 additions & 19 deletions cmd/metadata/contract.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,25 +78,16 @@ func (indexer *Indexer) resolveContractMetadata(ctx context.Context, cm *models.
indexer.incrementCounter("contract", cm.Status)
}

func (indexer *Indexer) onContractTick(ctx context.Context) error {
uresolved, err := indexer.db.GetContractMetadata(models.StatusNew, 15, 0)
if err != nil {
return err
}
for i := range uresolved {
resolveCtx, cancel := context.WithTimeout(ctx, 30*time.Second)
defer cancel()
func (indexer *Indexer) contractWorker(ctx context.Context, contract models.ContractMetadata) error {
resolveCtx, cancel := context.WithTimeout(ctx, 20*time.Second)
defer cancel()

indexer.resolveContractMetadata(resolveCtx, &uresolved[i])
indexer.resolveContractMetadata(resolveCtx, &contract)

if err := indexer.db.UpdateContractMetadata(&uresolved[i], map[string]interface{}{
"status": uresolved[i].Status,
"metadata": uresolved[i].Metadata,
"retry_count": uresolved[i].RetryCount,
"update_id": uresolved[i].UpdateID,
}); err != nil {
return err
}
}
return nil
return indexer.db.UpdateContractMetadata(&contract, map[string]interface{}{
"status": contract.Status,
"metadata": contract.Metadata,
"retry_count": contract.RetryCount,
"update_id": contract.UpdateID,
})
}
8 changes: 4 additions & 4 deletions cmd/metadata/indexer.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,8 @@ type Indexer struct {
scanner *tzkt.Scanner
prom *prometheus.Service
ctx *internalContext.Context
contracts *service.Service
tokens *service.Service
contracts *service.ContractService
tokens *service.TokenService
thumbnail *thumbnail.Service
settings config.Settings

Expand Down Expand Up @@ -71,8 +71,8 @@ func NewIndexer(ctx context.Context, network string, indexerConfig *config.Index
if aws := storage.NewAWS(settings.AWS.AccessKey, settings.AWS.Secret, settings.AWS.Region, settings.AWS.BucketName); aws != nil {
indexer.thumbnail = thumbnail.New(aws, db, prom, network, settings.IPFSGateways, 10)
}
indexer.contracts = service.New(indexer.onContractTick, service.WithName("contracts"))
indexer.tokens = service.New(indexer.onTokenTick, service.WithName("tokens"))
indexer.contracts = service.NewContractService(db, indexer.contractWorker)
indexer.tokens = service.NewTokenService(db, indexer.tokenWorker)

return indexer, nil
}
Expand Down
2 changes: 2 additions & 0 deletions cmd/metadata/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"os"
"os/signal"
"runtime"
"sync"
"syscall"
"time"
Expand Down Expand Up @@ -44,6 +45,7 @@ func main() {
log.Error(err)
return
}
runtime.GOMAXPROCS(int(cfg.Metadata.Settings.MaxCPU))

ctx, cancel := context.WithCancel(context.Background())
defer cancel()
Expand Down
75 changes: 75 additions & 0 deletions cmd/metadata/service/contract.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
package service

import (
"context"
"sync"

"github.com/dipdup-net/metadata/cmd/metadata/models"
log "github.com/sirupsen/logrus"
)

// ContractService -
type ContractService struct {
repo models.ContractRepository
handler func(ctx context.Context, contract models.ContractMetadata) error
workers chan struct{}
wg sync.WaitGroup
}

// NewContractService -
func NewContractService(repo models.ContractRepository, handler func(context.Context, models.ContractMetadata) error) *ContractService {
return &ContractService{
repo: repo,
handler: handler,
workers: make(chan struct{}, 10),
}
}

// Start -
func (s *ContractService) Start(ctx context.Context) {
s.wg.Add(1)
go s.manager(ctx)
}

// Close -
func (s *ContractService) Close() error {
s.wg.Wait()

close(s.workers)
return nil
}

func (s *ContractService) manager(ctx context.Context) {
defer s.wg.Done()

if s.handler == nil {
log.Warn("processor contract metadata service: service without handler")
return
}

for {
select {
case <-ctx.Done():
return
default:
unresolved, err := s.repo.GetContractMetadata(models.StatusNew, 15, 0)
if err != nil {
log.Error(err)
continue
}
for i := range unresolved {
s.workers <- struct{}{}
s.wg.Add(1)
go func(contract models.ContractMetadata) {
defer func() {
<-s.workers
s.wg.Done()
}()
if err := s.handler(ctx, contract); err != nil {
log.Error(err)
}
}(unresolved[i])
}
}
}
}
20 changes: 0 additions & 20 deletions cmd/metadata/service/options.go

This file was deleted.

66 changes: 0 additions & 66 deletions cmd/metadata/service/service.go

This file was deleted.

75 changes: 75 additions & 0 deletions cmd/metadata/service/token.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
package service

import (
"context"
"sync"

"github.com/dipdup-net/metadata/cmd/metadata/models"
log "github.com/sirupsen/logrus"
)

// TokenService -
type TokenService struct {
repo models.TokenRepository
handler func(ctx context.Context, contract models.TokenMetadata) error
workers chan struct{}
wg sync.WaitGroup
}

// NewContractService -
func NewTokenService(repo models.TokenRepository, handler func(context.Context, models.TokenMetadata) error) *TokenService {
return &TokenService{
repo: repo,
handler: handler,
workers: make(chan struct{}, 10),
}
}

// Start -
func (s *TokenService) Start(ctx context.Context) {
s.wg.Add(1)
go s.manager(ctx)
}

// Close -
func (s *TokenService) Close() error {
s.wg.Wait()

close(s.workers)
return nil
}

func (s *TokenService) manager(ctx context.Context) {
defer s.wg.Done()

if s.handler == nil {
log.Warn("processor token metadata service: service without handler")
return
}

for {
select {
case <-ctx.Done():
return
default:
unresolved, err := s.repo.GetTokenMetadata(models.StatusNew, 15, 0)
if err != nil {
log.Error(err)
continue
}
for i := range unresolved {
s.workers <- struct{}{}
s.wg.Add(1)
go func(contract models.TokenMetadata) {
defer func() {
<-s.workers
s.wg.Done()
}()
if err := s.handler(ctx, contract); err != nil {
log.Error(err)
}
}(unresolved[i])
}
}
}
}
32 changes: 11 additions & 21 deletions cmd/metadata/token.go
Original file line number Diff line number Diff line change
Expand Up @@ -183,27 +183,17 @@ func mergeTokenMetadata(src, got []byte) ([]byte, error) {
return json.Marshal(srcMap)
}

func (indexer *Indexer) onTokenTick(ctx context.Context) error {
uresolved, err := indexer.db.GetTokenMetadata(models.StatusNew, 15, 0)
if err != nil {
return err
}
func (indexer *Indexer) tokenWorker(ctx context.Context, token models.TokenMetadata) error {
resolveCtx, cancel := context.WithTimeout(ctx, 30*time.Second)
defer cancel()

for i := range uresolved {
resolveCtx, cancel := context.WithTimeout(ctx, 30*time.Second)
defer cancel()

if err := indexer.resolveTokenMetadata(resolveCtx, &uresolved[i]); err != nil {
return err
}
if err := indexer.db.UpdateTokenMetadata(&uresolved[i], map[string]interface{}{
"status": uresolved[i].Status,
"metadata": uresolved[i].Metadata,
"retry_count": uresolved[i].RetryCount,
"update_id": uresolved[i].UpdateID,
}); err != nil {
return err
}
if err := indexer.resolveTokenMetadata(resolveCtx, &token); err != nil {
return err
}
return nil
return indexer.db.UpdateTokenMetadata(&token, map[string]interface{}{
"status": token.Status,
"metadata": token.Metadata,
"retry_count": token.RetryCount,
"update_id": token.UpdateID,
})
}

0 comments on commit 039b7fb

Please sign in to comment.