Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactoring: ipfs #10

Merged
merged 5 commits into from
Jan 11, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion build/metadata/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,10 @@ WORKDIR $GOPATH/src/github.com/dipdup-net/metadata
RUN go mod download

COPY cmd/metadata cmd/metadata
COPY internal internal

WORKDIR $GOPATH/src/github.com/dipdup-net/metadata/cmd/metadata/
RUN go build -a -installsuffix cgo -o /go/bin/dipdup-metadata .
RUN go build -a -o /go/bin/dipdup-metadata .

# ---------------------------------------------------------------------
# The second stage container, for running the application
Expand Down
34 changes: 15 additions & 19 deletions cmd/metadata/contract.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,7 @@
package main

import (
"bytes"
"context"
stdJSON "encoding/json"
"fmt"
"unicode/utf8"

Expand All @@ -26,16 +24,15 @@ func (indexer *Indexer) processContractMetadata(update api.BigMapUpdate) (*model
return nil, err
}

metadata := models.ContractMetadata{
indexer.incrementCounter("contract", models.StatusNew)

return &models.ContractMetadata{
Network: indexer.network,
Contract: update.Contract.Address,
Status: models.StatusNew,
Link: string(link),
UpdateID: indexer.contractActionsCounter.Increment(),
}
indexer.incrementCounter("contract", metadata.Status)

return &metadata, nil
}, nil
}

func (indexer *Indexer) logContractMetadata(cm models.ContractMetadata, str, level string) {
Expand All @@ -54,7 +51,7 @@ func (indexer *Indexer) resolveContractMetadata(ctx context.Context, cm *models.
indexer.logContractMetadata(*cm, "Trying to resolve", "info")
cm.RetryCount += 1

data, err := indexer.resolver.Resolve(ctx, cm.Network, cm.Contract, cm.Link)
resolved, err := indexer.resolver.Resolve(ctx, cm.Network, cm.Contract, cm.Link)
if err != nil {
if e, ok := err.(resolver.ResolvingError); ok {
indexer.incrementErrorCounter(e)
Expand All @@ -68,23 +65,22 @@ func (indexer *Indexer) resolveContractMetadata(ctx context.Context, cm *models.
indexer.logContractMetadata(*cm, "Failed", "warn")
}
} else {
escaped := helpers.Escape(data)

if utf8.Valid(escaped) {
cm.Metadata = helpers.Escape(resolved.Data)
if utf8.Valid(cm.Metadata) {
cm.Status = models.StatusApplied

var dst bytes.Buffer
if err := stdJSON.Compact(&dst, escaped); err != nil {
cm.Metadata = escaped
} else {
cm.Metadata = dst.Bytes()
}
} else {
cm.Metadata = escaped
cm.Status = models.StatusFailed
}
}
cm.UpdateID = indexer.contractActionsCounter.Increment()
indexer.incrementCounter("contract", cm.Status)

if resolved.By == resolver.ResolverTypeIPFS && cm.Status == models.StatusApplied {
return indexer.db.SaveIPFSLink(models.IPFSLink{
Link: cm.Link,
Node: resolved.Node,
Data: resolved.Data,
})
}
return nil
}
28 changes: 27 additions & 1 deletion cmd/metadata/indexer.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,10 @@ import (
"strconv"
"strings"
"sync"
"time"

"github.com/go-pg/pg/v10"
"github.com/karlseguin/ccache"
"github.com/pkg/errors"

"github.com/rs/zerolog"
Expand All @@ -24,6 +26,7 @@ import (
"github.com/dipdup-net/metadata/cmd/metadata/storage"
"github.com/dipdup-net/metadata/cmd/metadata/thumbnail"
"github.com/dipdup-net/metadata/cmd/metadata/tzkt"
"github.com/dipdup-net/metadata/internal/ipfs"
)

// Indexer -
Expand Down Expand Up @@ -55,11 +58,16 @@ func NewIndexer(ctx context.Context, network string, indexerConfig *config.Index
}
cont := internalContext.NewContext()

metadataResolver, err := resolver.New(settings, cont)
if err != nil {
return nil, err
}

indexer := &Indexer{
scanner: tzkt.New(indexerConfig.DataSource.Tzkt, filters.Accounts...),
network: network,
indexName: models.IndexName(network),
resolver: resolver.New(settings, cont),
resolver: metadataResolver,
settings: settings,
ctx: cont,
db: db,
Expand Down Expand Up @@ -90,6 +98,10 @@ func (indexer *Indexer) Start(ctx context.Context) error {
return err
}

if err := indexer.resolver.Init(indexer.initResolverCache); err != nil {
return err
}

if err := indexer.ctx.Load(indexer.db); err != nil {
return err
}
Expand All @@ -109,6 +121,20 @@ func (indexer *Indexer) Start(ctx context.Context) error {
return nil
}

func (indexer *Indexer) initResolverCache(c *ccache.Cache) error {
links, err := indexer.db.IPFSLinks(1000, 0)
if err != nil {
return err
}
for i := range links {
c.Set(links[i].Link, ipfs.Data{
Raw: links[i].Data,
Node: links[i].Node,
}, time.Hour)
}
return nil
}

// Close -
func (indexer *Indexer) Close() error {
indexer.wg.Wait()
Expand Down
28 changes: 15 additions & 13 deletions cmd/metadata/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,8 +57,8 @@ func main() {

prometheusService := initPrometheus(cfg.Prometheus)

indexers := make(map[string]*Indexer)
indexerCancels := make(map[string]context.CancelFunc)
var indexers sync.Map
var indexerCancels sync.Map

var hasuraInit sync.Once
for network, indexer := range cfg.Metadata.Indexers {
Expand All @@ -67,8 +67,8 @@ func main() {
if err != nil {
log.Err(err).Msg("")
} else {
indexers[network] = result.indexer
indexerCancels[network] = result.cancel
indexers.Store(network, result.indexer)
indexerCancels.Store(network, result.cancel)
hasuraInit.Do(func() {
if err := hasura.Create(ctx, cfg.Hasura, cfg.Database, nil, new(models.TokenMetadata), new(models.ContractMetadata)); err != nil {
log.Err(err).Msg("")
Expand All @@ -89,8 +89,8 @@ func main() {
if err != nil {
log.Err(err).Msg("")
} else {
indexers[network] = result.indexer
indexerCancels[network] = result.cancel
indexers.Store(network, result.indexer)
indexerCancels.Store(network, result.cancel)
hasuraInit.Do(func() {
if err := hasura.Create(ctx, cfg.Hasura, cfg.Database, nil, new(models.TokenMetadata), new(models.ContractMetadata)); err != nil {
log.Err(err).Msg("")
Expand All @@ -105,18 +105,20 @@ func main() {

<-signals

for newtork, cancelIndexer := range indexerCancels {
log.Info().Msgf("stopping %s indexer...", newtork)
indexerCancels.Range(func(key, value interface{}) bool {
log.Info().Msgf("stopping %s indexer...", key.(string))
cancelIndexer := value.(context.CancelFunc)
cancelIndexer()
}
return true
})

log.Warn().Msgf("Trying carefully stopping....")
for _, indexer := range indexers {
if err := indexer.Close(); err != nil {
indexers.Range(func(key, value interface{}) bool {
if err := value.(*Indexer).Close(); err != nil {
log.Err(err).Msg("")
return
}
}
return err == nil
})

if prometheusService != nil {
if err := prometheusService.Close(); err != nil {
Expand Down
6 changes: 3 additions & 3 deletions cmd/metadata/models/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ func NewRelativeDatabase(ctx context.Context, cfg config.Database) (*RelativeDat
database.Wait(ctx, db, 5*time.Second)

for _, data := range []interface{}{
&database.State{}, &ContractMetadata{}, &TokenMetadata{}, &ContextItem{},
&database.State{}, &ContractMetadata{}, &TokenMetadata{}, &ContextItem{}, &IPFSLink{},
} {
if err := db.DB().WithContext(ctx).Model(data).CreateTable(&orm.CreateTableOptions{
IfNotExists: true,
Expand Down Expand Up @@ -213,13 +213,13 @@ func (db *RelativeDatabase) IPFSLink(id int64) (link IPFSLink, err error) {

// IPFSLinks -
func (db *RelativeDatabase) IPFSLinks(limit, offset int) (links []IPFSLink, err error) {
err = db.DB().Model(&links).Limit(limit).Offset(offset).Select(&links)
err = db.DB().Model(&links).Limit(limit).Offset(offset).Order("id desc").Select(&links)
return
}

// SaveIPFSLink -
func (db *RelativeDatabase) SaveIPFSLink(link IPFSLink) error {
_, err := db.DB().Model(&link).WherePK().SelectOrInsert(&link)
_, err := db.DB().Model(&link).Where("link = ?link").SelectOrInsert(&link)
return err
}

Expand Down
24 changes: 24 additions & 0 deletions cmd/metadata/models/elastic.go
Original file line number Diff line number Diff line change
Expand Up @@ -571,3 +571,27 @@ func (e *Elastic) createIndex(name string) error {
}
return nil
}

// IPFSLink -
func (e *Elastic) IPFSLink(id int64) (link IPFSLink, err error) {
// TODO: implement
return
}

// IPFSLinks -
func (e *Elastic) IPFSLinks(limit, offset int) (links []IPFSLink, err error) {
// TODO: implement
return
}

// SaveIPFSLink -
func (e *Elastic) SaveIPFSLink(link IPFSLink) error {
// TODO: implement
return nil
}

// UpdateIPFSLink -
func (e *Elastic) UpdateIPFSLink(link IPFSLink) error {
// TODO: implement
return nil
}
4 changes: 2 additions & 2 deletions cmd/metadata/models/ipfs_link.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@ type IPFSLink struct {

ID int64
Node string
Link string
Data JSONB `pg:",type:jsonb,use_zero"`
Link string `pg:",unique:ipfs"`
Data JSONB `pg:",type:jsonb,use_zero"`
}

// TableName -
Expand Down
1 change: 1 addition & 0 deletions cmd/metadata/models/source.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ type Database interface {
ContractRepository
TokenRepository
ContextRepository
IPFSLinkRepository
database.StateRepository
io.Closer
}
Expand Down
Loading