Skip to content

Commit

Permalink
Refactoring: ipfs (#10)
Browse files Browse the repository at this point in the history
* Refactoring: ipfs

* Dockerfile

* Fix: memory leak

* Fix: remove pprof

* Upgrade: go-lib
  • Loading branch information
aopoltorzhicky authored Jan 11, 2022
1 parent 8ddc628 commit f9ccff3
Show file tree
Hide file tree
Showing 20 changed files with 465 additions and 240 deletions.
3 changes: 2 additions & 1 deletion build/metadata/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,10 @@ WORKDIR $GOPATH/src/github.com/dipdup-net/metadata
RUN go mod download

COPY cmd/metadata cmd/metadata
COPY internal internal

WORKDIR $GOPATH/src/github.com/dipdup-net/metadata/cmd/metadata/
RUN go build -a -installsuffix cgo -o /go/bin/dipdup-metadata .
RUN go build -a -o /go/bin/dipdup-metadata .

# ---------------------------------------------------------------------
# The second stage container, for running the application
Expand Down
34 changes: 15 additions & 19 deletions cmd/metadata/contract.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,7 @@
package main

import (
"bytes"
"context"
stdJSON "encoding/json"
"fmt"
"unicode/utf8"

Expand All @@ -26,16 +24,15 @@ func (indexer *Indexer) processContractMetadata(update api.BigMapUpdate) (*model
return nil, err
}

metadata := models.ContractMetadata{
indexer.incrementCounter("contract", models.StatusNew)

return &models.ContractMetadata{
Network: indexer.network,
Contract: update.Contract.Address,
Status: models.StatusNew,
Link: string(link),
UpdateID: indexer.contractActionsCounter.Increment(),
}
indexer.incrementCounter("contract", metadata.Status)

return &metadata, nil
}, nil
}

func (indexer *Indexer) logContractMetadata(cm models.ContractMetadata, str, level string) {
Expand All @@ -54,7 +51,7 @@ func (indexer *Indexer) resolveContractMetadata(ctx context.Context, cm *models.
indexer.logContractMetadata(*cm, "Trying to resolve", "info")
cm.RetryCount += 1

data, err := indexer.resolver.Resolve(ctx, cm.Network, cm.Contract, cm.Link)
resolved, err := indexer.resolver.Resolve(ctx, cm.Network, cm.Contract, cm.Link)
if err != nil {
if e, ok := err.(resolver.ResolvingError); ok {
indexer.incrementErrorCounter(e)
Expand All @@ -68,23 +65,22 @@ func (indexer *Indexer) resolveContractMetadata(ctx context.Context, cm *models.
indexer.logContractMetadata(*cm, "Failed", "warn")
}
} else {
escaped := helpers.Escape(data)

if utf8.Valid(escaped) {
cm.Metadata = helpers.Escape(resolved.Data)
if utf8.Valid(cm.Metadata) {
cm.Status = models.StatusApplied

var dst bytes.Buffer
if err := stdJSON.Compact(&dst, escaped); err != nil {
cm.Metadata = escaped
} else {
cm.Metadata = dst.Bytes()
}
} else {
cm.Metadata = escaped
cm.Status = models.StatusFailed
}
}
cm.UpdateID = indexer.contractActionsCounter.Increment()
indexer.incrementCounter("contract", cm.Status)

if resolved.By == resolver.ResolverTypeIPFS && cm.Status == models.StatusApplied {
return indexer.db.SaveIPFSLink(models.IPFSLink{
Link: cm.Link,
Node: resolved.Node,
Data: resolved.Data,
})
}
return nil
}
28 changes: 27 additions & 1 deletion cmd/metadata/indexer.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,10 @@ import (
"strconv"
"strings"
"sync"
"time"

"github.com/go-pg/pg/v10"
"github.com/karlseguin/ccache"
"github.com/pkg/errors"

"github.com/rs/zerolog"
Expand All @@ -24,6 +26,7 @@ import (
"github.com/dipdup-net/metadata/cmd/metadata/storage"
"github.com/dipdup-net/metadata/cmd/metadata/thumbnail"
"github.com/dipdup-net/metadata/cmd/metadata/tzkt"
"github.com/dipdup-net/metadata/internal/ipfs"
)

// Indexer -
Expand Down Expand Up @@ -55,11 +58,16 @@ func NewIndexer(ctx context.Context, network string, indexerConfig *config.Index
}
cont := internalContext.NewContext()

metadataResolver, err := resolver.New(settings, cont)
if err != nil {
return nil, err
}

indexer := &Indexer{
scanner: tzkt.New(indexerConfig.DataSource.Tzkt, filters.Accounts...),
network: network,
indexName: models.IndexName(network),
resolver: resolver.New(settings, cont),
resolver: metadataResolver,
settings: settings,
ctx: cont,
db: db,
Expand Down Expand Up @@ -90,6 +98,10 @@ func (indexer *Indexer) Start(ctx context.Context) error {
return err
}

if err := indexer.resolver.Init(indexer.initResolverCache); err != nil {
return err
}

if err := indexer.ctx.Load(indexer.db); err != nil {
return err
}
Expand All @@ -109,6 +121,20 @@ func (indexer *Indexer) Start(ctx context.Context) error {
return nil
}

func (indexer *Indexer) initResolverCache(c *ccache.Cache) error {
links, err := indexer.db.IPFSLinks(1000, 0)
if err != nil {
return err
}
for i := range links {
c.Set(links[i].Link, ipfs.Data{
Raw: links[i].Data,
Node: links[i].Node,
}, time.Hour)
}
return nil
}

// Close -
func (indexer *Indexer) Close() error {
indexer.wg.Wait()
Expand Down
28 changes: 15 additions & 13 deletions cmd/metadata/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,8 +57,8 @@ func main() {

prometheusService := initPrometheus(cfg.Prometheus)

indexers := make(map[string]*Indexer)
indexerCancels := make(map[string]context.CancelFunc)
var indexers sync.Map
var indexerCancels sync.Map

var hasuraInit sync.Once
for network, indexer := range cfg.Metadata.Indexers {
Expand All @@ -67,8 +67,8 @@ func main() {
if err != nil {
log.Err(err).Msg("")
} else {
indexers[network] = result.indexer
indexerCancels[network] = result.cancel
indexers.Store(network, result.indexer)
indexerCancels.Store(network, result.cancel)
hasuraInit.Do(func() {
if err := hasura.Create(ctx, cfg.Hasura, cfg.Database, nil, new(models.TokenMetadata), new(models.ContractMetadata)); err != nil {
log.Err(err).Msg("")
Expand All @@ -89,8 +89,8 @@ func main() {
if err != nil {
log.Err(err).Msg("")
} else {
indexers[network] = result.indexer
indexerCancels[network] = result.cancel
indexers.Store(network, result.indexer)
indexerCancels.Store(network, result.cancel)
hasuraInit.Do(func() {
if err := hasura.Create(ctx, cfg.Hasura, cfg.Database, nil, new(models.TokenMetadata), new(models.ContractMetadata)); err != nil {
log.Err(err).Msg("")
Expand All @@ -105,18 +105,20 @@ func main() {

<-signals

for newtork, cancelIndexer := range indexerCancels {
log.Info().Msgf("stopping %s indexer...", newtork)
indexerCancels.Range(func(key, value interface{}) bool {
log.Info().Msgf("stopping %s indexer...", key.(string))
cancelIndexer := value.(context.CancelFunc)
cancelIndexer()
}
return true
})

log.Warn().Msgf("Trying carefully stopping....")
for _, indexer := range indexers {
if err := indexer.Close(); err != nil {
indexers.Range(func(key, value interface{}) bool {
if err := value.(*Indexer).Close(); err != nil {
log.Err(err).Msg("")
return
}
}
return err == nil
})

if prometheusService != nil {
if err := prometheusService.Close(); err != nil {
Expand Down
6 changes: 3 additions & 3 deletions cmd/metadata/models/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ func NewRelativeDatabase(ctx context.Context, cfg config.Database) (*RelativeDat
database.Wait(ctx, db, 5*time.Second)

for _, data := range []interface{}{
&database.State{}, &ContractMetadata{}, &TokenMetadata{}, &ContextItem{},
&database.State{}, &ContractMetadata{}, &TokenMetadata{}, &ContextItem{}, &IPFSLink{},
} {
if err := db.DB().WithContext(ctx).Model(data).CreateTable(&orm.CreateTableOptions{
IfNotExists: true,
Expand Down Expand Up @@ -213,13 +213,13 @@ func (db *RelativeDatabase) IPFSLink(id int64) (link IPFSLink, err error) {

// IPFSLinks -
func (db *RelativeDatabase) IPFSLinks(limit, offset int) (links []IPFSLink, err error) {
err = db.DB().Model(&links).Limit(limit).Offset(offset).Select(&links)
err = db.DB().Model(&links).Limit(limit).Offset(offset).Order("id desc").Select(&links)
return
}

// SaveIPFSLink -
func (db *RelativeDatabase) SaveIPFSLink(link IPFSLink) error {
_, err := db.DB().Model(&link).WherePK().SelectOrInsert(&link)
_, err := db.DB().Model(&link).Where("link = ?link").SelectOrInsert(&link)
return err
}

Expand Down
24 changes: 24 additions & 0 deletions cmd/metadata/models/elastic.go
Original file line number Diff line number Diff line change
Expand Up @@ -571,3 +571,27 @@ func (e *Elastic) createIndex(name string) error {
}
return nil
}

// IPFSLink -
func (e *Elastic) IPFSLink(id int64) (link IPFSLink, err error) {
// TODO: implement
return
}

// IPFSLinks -
func (e *Elastic) IPFSLinks(limit, offset int) (links []IPFSLink, err error) {
// TODO: implement
return
}

// SaveIPFSLink -
func (e *Elastic) SaveIPFSLink(link IPFSLink) error {
// TODO: implement
return nil
}

// UpdateIPFSLink -
func (e *Elastic) UpdateIPFSLink(link IPFSLink) error {
// TODO: implement
return nil
}
4 changes: 2 additions & 2 deletions cmd/metadata/models/ipfs_link.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@ type IPFSLink struct {

ID int64
Node string
Link string
Data JSONB `pg:",type:jsonb,use_zero"`
Link string `pg:",unique:ipfs"`
Data JSONB `pg:",type:jsonb,use_zero"`
}

// TableName -
Expand Down
1 change: 1 addition & 0 deletions cmd/metadata/models/source.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ type Database interface {
ContractRepository
TokenRepository
ContextRepository
IPFSLinkRepository
database.StateRepository
io.Closer
}
Expand Down
Loading

0 comments on commit f9ccff3

Please sign in to comment.