From 94f79dc672da8a6c676c016708ccb734bc7d6beb Mon Sep 17 00:00:00 2001 From: Artem Date: Mon, 10 Jan 2022 15:29:41 +0300 Subject: [PATCH 1/5] Refactoring: ipfs --- cmd/metadata/contract.go | 25 ++--- cmd/metadata/indexer.go | 28 ++++- cmd/metadata/main.go | 35 +++--- cmd/metadata/models/db.go | 6 +- cmd/metadata/models/elastic.go | 24 ++++ cmd/metadata/models/ipfs_link.go | 4 +- cmd/metadata/models/source.go | 1 + cmd/metadata/resolver/ipfs.go | 80 +++++++------ cmd/metadata/resolver/resolver.go | 106 +++++++++++++----- cmd/metadata/service/contract.go | 55 +++++---- cmd/metadata/service/token.go | 55 +++++---- cmd/metadata/thumbnail/service.go | 8 +- cmd/metadata/token.go | 30 +++-- internal/ipfs/data.go | 7 ++ internal/ipfs/errors.go | 12 ++ .../ipfs.go => internal/ipfs/functions.go | 20 ++-- internal/ipfs/pool.go | 101 +++++++++++++++++ 17 files changed, 405 insertions(+), 192 deletions(-) create mode 100644 internal/ipfs/data.go create mode 100644 internal/ipfs/errors.go rename cmd/metadata/helpers/ipfs.go => internal/ipfs/functions.go (76%) create mode 100644 internal/ipfs/pool.go diff --git a/cmd/metadata/contract.go b/cmd/metadata/contract.go index a4f2a15..2b190ce 100644 --- a/cmd/metadata/contract.go +++ b/cmd/metadata/contract.go @@ -1,9 +1,7 @@ package main import ( - "bytes" "context" - stdJSON "encoding/json" "fmt" "unicode/utf8" @@ -54,7 +52,7 @@ func (indexer *Indexer) resolveContractMetadata(ctx context.Context, cm *models. indexer.logContractMetadata(*cm, "Trying to resolve", "info") cm.RetryCount += 1 - data, err := indexer.resolver.Resolve(ctx, cm.Network, cm.Contract, cm.Link) + resolved, err := indexer.resolver.Resolve(ctx, cm.Network, cm.Contract, cm.Link) if err != nil { if e, ok := err.(resolver.ResolvingError); ok { indexer.incrementErrorCounter(e) @@ -68,23 +66,22 @@ func (indexer *Indexer) resolveContractMetadata(ctx context.Context, cm *models. indexer.logContractMetadata(*cm, "Failed", "warn") } } else { - escaped := helpers.Escape(data) - - if utf8.Valid(escaped) { + cm.Metadata = helpers.Escape(resolved.Data) + if utf8.Valid(cm.Metadata) { cm.Status = models.StatusApplied - - var dst bytes.Buffer - if err := stdJSON.Compact(&dst, escaped); err != nil { - cm.Metadata = escaped - } else { - cm.Metadata = dst.Bytes() - } } else { - cm.Metadata = escaped cm.Status = models.StatusFailed } } cm.UpdateID = indexer.contractActionsCounter.Increment() indexer.incrementCounter("contract", cm.Status) + + if resolved.By == resolver.ResolverTypeIPFS && cm.Status == models.StatusApplied { + return indexer.db.SaveIPFSLink(models.IPFSLink{ + Link: cm.Link, + Node: resolved.Node, + Data: resolved.Data, + }) + } return nil } diff --git a/cmd/metadata/indexer.go b/cmd/metadata/indexer.go index 5c7d30b..9adc25a 100644 --- a/cmd/metadata/indexer.go +++ b/cmd/metadata/indexer.go @@ -5,8 +5,10 @@ import ( "strconv" "strings" "sync" + "time" "github.com/go-pg/pg/v10" + "github.com/karlseguin/ccache" "github.com/pkg/errors" "github.com/rs/zerolog" @@ -24,6 +26,7 @@ import ( "github.com/dipdup-net/metadata/cmd/metadata/storage" "github.com/dipdup-net/metadata/cmd/metadata/thumbnail" "github.com/dipdup-net/metadata/cmd/metadata/tzkt" + "github.com/dipdup-net/metadata/internal/ipfs" ) // Indexer - @@ -55,11 +58,16 @@ func NewIndexer(ctx context.Context, network string, indexerConfig *config.Index } cont := internalContext.NewContext() + metadataResolver, err := resolver.New(settings, cont) + if err != nil { + return nil, err + } + indexer := &Indexer{ scanner: tzkt.New(indexerConfig.DataSource.Tzkt, filters.Accounts...), network: network, indexName: models.IndexName(network), - resolver: resolver.New(settings, cont), + resolver: metadataResolver, settings: settings, ctx: cont, db: db, @@ -90,6 +98,10 @@ func (indexer *Indexer) Start(ctx context.Context) error { return err } + if err := indexer.resolver.Init(indexer.initResolverCache); err != nil { + return err + } + if err := indexer.ctx.Load(indexer.db); err != nil { return err } @@ -109,6 +121,20 @@ func (indexer *Indexer) Start(ctx context.Context) error { return nil } +func (indexer *Indexer) initResolverCache(c *ccache.Cache) error { + links, err := indexer.db.IPFSLinks(1000, 0) + if err != nil { + return err + } + for i := range links { + c.Set(links[i].Link, ipfs.Data{ + Raw: links[i].Data, + Node: links[i].Node, + }, time.Hour) + } + return nil +} + // Close - func (indexer *Indexer) Close() error { indexer.wg.Wait() diff --git a/cmd/metadata/main.go b/cmd/metadata/main.go index 55a8d6c..58e10aa 100644 --- a/cmd/metadata/main.go +++ b/cmd/metadata/main.go @@ -18,6 +18,9 @@ import ( "github.com/dipdup-net/go-lib/prometheus" "github.com/dipdup-net/metadata/cmd/metadata/config" "github.com/dipdup-net/metadata/cmd/metadata/models" + + "net/http" + _ "net/http/pprof" ) const ( @@ -37,6 +40,10 @@ func main() { TimeFormat: "2006-01-02 15:04:05", }).Level(zerolog.InfoLevel) + go func() { + log.Print(http.ListenAndServe("localhost:6060", nil)) + }() + args := cmdline.Parse() if args.Help { return @@ -57,8 +64,8 @@ func main() { prometheusService := initPrometheus(cfg.Prometheus) - indexers := make(map[string]*Indexer) - indexerCancels := make(map[string]context.CancelFunc) + var indexers sync.Map + var indexerCancels sync.Map var hasuraInit sync.Once for network, indexer := range cfg.Metadata.Indexers { @@ -67,8 +74,8 @@ func main() { if err != nil { log.Err(err).Msg("") } else { - indexers[network] = result.indexer - indexerCancels[network] = result.cancel + indexers.Store(network, result.indexer) + indexerCancels.Store(network, result.cancel) hasuraInit.Do(func() { if err := hasura.Create(ctx, cfg.Hasura, cfg.Database, nil, new(models.TokenMetadata), new(models.ContractMetadata)); err != nil { log.Err(err).Msg("") @@ -89,8 +96,8 @@ func main() { if err != nil { log.Err(err).Msg("") } else { - indexers[network] = result.indexer - indexerCancels[network] = result.cancel + indexers.Store(network, result.indexer) + indexerCancels.Store(network, result.cancel) hasuraInit.Do(func() { if err := hasura.Create(ctx, cfg.Hasura, cfg.Database, nil, new(models.TokenMetadata), new(models.ContractMetadata)); err != nil { log.Err(err).Msg("") @@ -105,18 +112,20 @@ func main() { <-signals - for newtork, cancelIndexer := range indexerCancels { - log.Info().Msgf("stopping %s indexer...", newtork) + indexerCancels.Range(func(key, value interface{}) bool { + log.Info().Msgf("stopping %s indexer...", key.(string)) + cancelIndexer := value.(context.CancelFunc) cancelIndexer() - } + return true + }) log.Warn().Msgf("Trying carefully stopping....") - for _, indexer := range indexers { - if err := indexer.Close(); err != nil { + indexers.Range(func(key, value interface{}) bool { + if err := value.(*Indexer).Close(); err != nil { log.Err(err).Msg("") - return } - } + return err == nil + }) if prometheusService != nil { if err := prometheusService.Close(); err != nil { diff --git a/cmd/metadata/models/db.go b/cmd/metadata/models/db.go index 80f726d..29a9052 100644 --- a/cmd/metadata/models/db.go +++ b/cmd/metadata/models/db.go @@ -37,7 +37,7 @@ func NewRelativeDatabase(ctx context.Context, cfg config.Database) (*RelativeDat database.Wait(ctx, db, 5*time.Second) for _, data := range []interface{}{ - &database.State{}, &ContractMetadata{}, &TokenMetadata{}, &ContextItem{}, + &database.State{}, &ContractMetadata{}, &TokenMetadata{}, &ContextItem{}, &IPFSLink{}, } { if err := db.DB().WithContext(ctx).Model(data).CreateTable(&orm.CreateTableOptions{ IfNotExists: true, @@ -213,13 +213,13 @@ func (db *RelativeDatabase) IPFSLink(id int64) (link IPFSLink, err error) { // IPFSLinks - func (db *RelativeDatabase) IPFSLinks(limit, offset int) (links []IPFSLink, err error) { - err = db.DB().Model(&links).Limit(limit).Offset(offset).Select(&links) + err = db.DB().Model(&links).Limit(limit).Offset(offset).Order("id desc").Select(&links) return } // SaveIPFSLink - func (db *RelativeDatabase) SaveIPFSLink(link IPFSLink) error { - _, err := db.DB().Model(&link).WherePK().SelectOrInsert(&link) + _, err := db.DB().Model(&link).Where("link = ?link").SelectOrInsert(&link) return err } diff --git a/cmd/metadata/models/elastic.go b/cmd/metadata/models/elastic.go index d97f2a2..e006b31 100644 --- a/cmd/metadata/models/elastic.go +++ b/cmd/metadata/models/elastic.go @@ -571,3 +571,27 @@ func (e *Elastic) createIndex(name string) error { } return nil } + +// IPFSLink - +func (e *Elastic) IPFSLink(id int64) (link IPFSLink, err error) { + // TODO: implement + return +} + +// IPFSLinks - +func (e *Elastic) IPFSLinks(limit, offset int) (links []IPFSLink, err error) { + // TODO: implement + return +} + +// SaveIPFSLink - +func (e *Elastic) SaveIPFSLink(link IPFSLink) error { + // TODO: implement + return nil +} + +// UpdateIPFSLink - +func (e *Elastic) UpdateIPFSLink(link IPFSLink) error { + // TODO: implement + return nil +} diff --git a/cmd/metadata/models/ipfs_link.go b/cmd/metadata/models/ipfs_link.go index 1e2a04a..ccb20a2 100644 --- a/cmd/metadata/models/ipfs_link.go +++ b/cmd/metadata/models/ipfs_link.go @@ -7,8 +7,8 @@ type IPFSLink struct { ID int64 Node string - Link string - Data JSONB `pg:",type:jsonb,use_zero"` + Link string `pg:",unique:ipfs"` + Data JSONB `pg:",type:jsonb,use_zero"` } // TableName - diff --git a/cmd/metadata/models/source.go b/cmd/metadata/models/source.go index 77f5460..da42195 100644 --- a/cmd/metadata/models/source.go +++ b/cmd/metadata/models/source.go @@ -13,6 +13,7 @@ type Database interface { ContractRepository TokenRepository ContextRepository + IPFSLinkRepository database.StateRepository io.Closer } diff --git a/cmd/metadata/resolver/ipfs.go b/cmd/metadata/resolver/ipfs.go index 3a9eb1f..f9f67c7 100644 --- a/cmd/metadata/resolver/ipfs.go +++ b/cmd/metadata/resolver/ipfs.go @@ -5,7 +5,7 @@ import ( "strings" "time" - "github.com/dipdup-net/metadata/cmd/metadata/helpers" + "github.com/dipdup-net/metadata/internal/ipfs" "github.com/karlseguin/ccache" shell "github.com/ipfs/go-ipfs-api" @@ -17,23 +17,15 @@ const ( // Ipfs - type Ipfs struct { - Http - - cache *ccache.Cache - pinning []*shell.Shell - gateways []string + cache *ccache.Cache + pinning []*shell.Shell + pool *ipfs.Pool + timeout time.Duration } // IpfsOption - type IpfsOption func(*Ipfs) -// WithTimeoutIpfs - -func WithTimeoutIpfs(timeout uint64) IpfsOption { - return func(s *Ipfs) { - WithTimeoutHttp(timeout)(&s.Http) - } -} - // WithPinningIpfs - func WithPinningIpfs(urls []string) IpfsOption { return func(s *Ipfs) { @@ -48,49 +40,53 @@ func WithPinningIpfs(urls []string) IpfsOption { } } +// WithTimeoutIpfs - +func WithTimeoutIpfs(timeout uint64) IpfsOption { + return func(s *Ipfs) { + s.timeout = time.Duration(timeout) * time.Second + } +} + // NewIPFS - -func NewIPFS(gateways []string, opts ...IpfsOption) Ipfs { +func NewIPFS(gateways []string, opts ...IpfsOption) (Ipfs, error) { + pool, err := ipfs.NewPool(gateways, 1024*1024) + if err != nil { + return Ipfs{}, err + } s := Ipfs{ - Http: NewHttp(), - pinning: make([]*shell.Shell, 0), - gateways: gateways, - cache: ccache.New(ccache.Configure().MaxSize(30000)), + pinning: make([]*shell.Shell, 0), + pool: pool, + cache: ccache.New(ccache.Configure().MaxSize(1000)), } for i := range opts { opts[i](&s) } - return s + return s, nil } // Resolve - -func (s Ipfs) Resolve(ctx context.Context, network, address, link string) ([]byte, error) { - if len(s.gateways) == 0 { - return nil, ErrEmptyIPFSGatewayList - } - - path := helpers.IPFSPath(link) +func (s Ipfs) Resolve(ctx context.Context, network, address, link string) (ipfs.Data, error) { + path := ipfs.Path(link) for _, sh := range s.pinning { _ = sh.Pin(path) } - gateways := helpers.ShuffleGateways(s.gateways) - for i := range gateways { - url := helpers.IPFSLink(gateways[i], path) - data, err := s.cache.Fetch(path, time.Hour, func() (interface{}, error) { - return s.Http.Resolve(ctx, network, address, url) - }) - if err == nil { - contents := data.Value().([]byte) - if len(s.pinning) > 0 { - s.pinContents(contents) - } - return contents, nil - } - } + data, err := s.cache.Fetch(link, time.Hour, func() (interface{}, error) { + requestCtx, cancel := context.WithTimeout(ctx, s.timeout) + defer cancel() - return nil, ErrNoIPFSResponse + return s.pool.Get(requestCtx, link) + }) + if err != nil { + return ipfs.Data{}, newResolvingError(0, ErrorTypeHttpRequest, err) + } + content := data.Value().(ipfs.Data) + if len(s.pinning) > 0 { + s.pinContent(content.Raw) + } + return content, nil } // Is - @@ -98,8 +94,8 @@ func (s Ipfs) Is(link string) bool { return strings.HasPrefix(link, prefixIpfs) } -func (s Ipfs) pinContents(data []byte) { - hash := helpers.FindAllIPFSLinks(data) +func (s Ipfs) pinContent(data []byte) { + hash := ipfs.FindAllLinks(data) for i := range hash { for _, sh := range s.pinning { diff --git a/cmd/metadata/resolver/resolver.go b/cmd/metadata/resolver/resolver.go index c253b8e..d4588d9 100644 --- a/cmd/metadata/resolver/resolver.go +++ b/cmd/metadata/resolver/resolver.go @@ -7,9 +7,21 @@ import ( "github.com/dipdup-net/metadata/cmd/metadata/config" internalContext "github.com/dipdup-net/metadata/cmd/metadata/context" + "github.com/karlseguin/ccache" "github.com/pkg/errors" ) +// ResolverType - +type ResolverType int + +// resolver types +const ( + ResolverTypeIPFS ResolverType = iota + 1 + ResolverTypeHTTP + ResolverTypeTezos + ResolverTypeSha256 +) + // ErrorType - type ErrorType string @@ -41,52 +53,84 @@ func newResolvingError(code int, typ ErrorType, err error) ResolvingError { return ResolvingError{code, typ, err} } -// Resolver - -type Resolver interface { - Resolve(ctx context.Context, network, address, link string) ([]byte, error) - Is(link string) bool +// Resolved - +type Resolved struct { + By ResolverType + Node string + Data []byte } // Receiver - type Receiver struct { - resolvers []Resolver + http Http + ipfs Ipfs + sha Sha256 + tezos TezosStorage } // New - -func New(settings config.Settings, ctx *internalContext.Context) Receiver { - return Receiver{ - []Resolver{ - NewIPFS(settings.IPFSGateways, WithTimeoutIpfs(settings.IPFSTimeout), WithPinningIpfs(settings.IPFSPinning)), - NewTezosStorage(ctx), - NewHttp(WithTimeoutHttp(settings.HTTPTimeout)), - NewSha256(WithTimeoutSha256(settings.HTTPTimeout)), - }, +func New(settings config.Settings, ctx *internalContext.Context) (Receiver, error) { + ipfs, err := NewIPFS(settings.IPFSGateways, WithTimeoutIpfs(settings.IPFSTimeout), WithPinningIpfs(settings.IPFSPinning)) + if err != nil { + return Receiver{}, err } + return Receiver{ + ipfs: ipfs, + tezos: NewTezosStorage(ctx), + http: NewHttp(WithTimeoutHttp(settings.HTTPTimeout)), + sha: NewSha256(WithTimeoutSha256(settings.HTTPTimeout)), + }, nil +} + +// Init - +func (r Receiver) Init(initFunc func(*ccache.Cache) error) error { + return initFunc(r.ipfs.cache) } // Resolve - -func (r Receiver) Resolve(ctx context.Context, network, address, link string) ([]byte, error) { +func (r Receiver) Resolve(ctx context.Context, network, address, link string) (resolved Resolved, err error) { if len(link) < 7 { // the shortest prefix is http:// - return nil, errors.Wrap(ErrUnknownStorageType, link) + return resolved, errors.Wrap(ErrUnknownStorageType, link) } - for i := range r.resolvers { - if r.resolvers[i].Is(link) { - data, err := r.resolvers[i].Resolve(ctx, network, address, link) - if err != nil { - return nil, err - } - if !json.Valid(data) { - return nil, newResolvingError(0, ErrorTypeInvalidJSON, errors.New("invalid json")) - } - - var buf bytes.Buffer - if err := json.Compact(&buf, data); err != nil { - return nil, err - } - return buf.Bytes(), nil + switch { + case r.ipfs.Is(link): + resolved.By = ResolverTypeIPFS + data, err := r.ipfs.Resolve(ctx, network, address, link) + if err != nil { + return resolved, err } + resolved.Data = data.Raw + resolved.Node = data.Node + + case r.tezos.Is(link): + resolved.By = ResolverTypeTezos + resolved.Data, err = r.tezos.Resolve(ctx, network, address, link) + + case r.http.Is(link): + resolved.By = ResolverTypeHTTP + resolved.Data, err = r.http.Resolve(ctx, network, address, link) + + case r.sha.Is(link): + resolved.By = ResolverTypeSha256 + resolved.Data, err = r.sha.Resolve(ctx, network, address, link) + + default: + return resolved, errors.Wrap(ErrUnknownStorageType, link) + } + + if err != nil { + return + } + + if !json.Valid(resolved.Data) { + return resolved, newResolvingError(0, ErrorTypeInvalidJSON, errors.New("invalid json")) } - return nil, errors.Wrap(ErrUnknownStorageType, link) + var buf bytes.Buffer + if err := json.Compact(&buf, resolved.Data); err != nil { + return resolved, err + } + resolved.Data = buf.Bytes() + return } diff --git a/cmd/metadata/service/contract.go b/cmd/metadata/service/contract.go index 6bcdd34..0396eee 100644 --- a/cmd/metadata/service/contract.go +++ b/cmd/metadata/service/contract.go @@ -27,9 +27,9 @@ func NewContractService(db models.Database, handler func(context.Context, *model maxRetryCount: maxRetryCount, db: db, handler: handler, - tasks: make(chan *models.ContractMetadata, 1024*128), - result: make(chan *models.ContractMetadata, 15), - workers: make(chan struct{}, 10), + tasks: make(chan *models.ContractMetadata, 512), + result: make(chan *models.ContractMetadata, 16), + workers: make(chan struct{}, 5), network: network, } } @@ -41,21 +41,6 @@ func (s *ContractService) Start(ctx context.Context) { s.wg.Add(1) go s.saver(ctx) - - var offset int - var end bool - for !end { - contracts, err := s.db.GetContractMetadata(s.network, models.StatusNew, 100, offset, int(s.maxRetryCount)) - if err != nil { - log.Err(err).Msg("GetContractMetadata") - continue - } - for i := range contracts { - s.tasks <- &contracts[i] - } - offset += len(contracts) - end = len(contracts) < 100 - } } // Close - @@ -79,10 +64,26 @@ func (s *ContractService) manager(ctx context.Context) { return } + ticker := time.NewTicker(time.Second * 15) + defer ticker.Stop() + for { select { case <-ctx.Done(): return + case <-ticker.C: + if len(s.tasks) > 100 { + continue + } + contracts, err := s.db.GetContractMetadata(s.network, models.StatusNew, 100, 0, int(s.maxRetryCount)) + if err != nil { + log.Err(err).Msg("GetContractMetadata") + continue + } + for i := range contracts { + s.tasks <- &contracts[i] + } + case unresolved := <-s.tasks: s.workers <- struct{}{} s.wg.Add(1) @@ -94,12 +95,10 @@ func (s *ContractService) manager(ctx context.Context) { func (s *ContractService) saver(ctx context.Context) { defer s.wg.Done() - bulkSize := 8 - ticker := time.NewTicker(time.Second * 15) defer ticker.Stop() - contracts := make([]*models.ContractMetadata, 0, bulkSize) + contracts := make([]*models.ContractMetadata, 0) for { select { case <-ctx.Done(): @@ -108,23 +107,19 @@ func (s *ContractService) saver(ctx context.Context) { case <-ticker.C: if err := s.db.UpdateContractMetadata(ctx, contracts); err != nil { log.Err(err).Msg("UpdateContractMetadata") - return + continue } - contracts = make([]*models.ContractMetadata, 0, bulkSize) + contracts = nil case contract := <-s.result: - if contract.Status != models.StatusApplied && int64(contract.RetryCount) < s.maxRetryCount { - s.tasks <- contract - } - contracts = append(contracts, contract) - if len(contracts) == bulkSize { + if len(contracts) == 8 { if err := s.db.UpdateContractMetadata(ctx, contracts); err != nil { log.Err(err).Msg("UpdateContractMetadata") - return + continue } - contracts = make([]*models.ContractMetadata, 0, bulkSize) + contracts = nil ticker.Reset(time.Second * 15) } } diff --git a/cmd/metadata/service/token.go b/cmd/metadata/service/token.go index d683c81..43b23f9 100644 --- a/cmd/metadata/service/token.go +++ b/cmd/metadata/service/token.go @@ -28,9 +28,9 @@ func NewTokenService(repo models.TokenRepository, handler func(context.Context, network: network, repo: repo, handler: handler, - tasks: make(chan *models.TokenMetadata, 1024*128), - result: make(chan *models.TokenMetadata, 15), - workers: make(chan struct{}, 10), + tasks: make(chan *models.TokenMetadata, 512), + result: make(chan *models.TokenMetadata, 16), + workers: make(chan struct{}, 5), } } @@ -41,21 +41,6 @@ func (s *TokenService) Start(ctx context.Context) { s.wg.Add(1) go s.manager(ctx) - - var offset int - var end bool - for !end { - tokens, err := s.repo.GetTokenMetadata(s.network, models.StatusNew, 100, offset, int(s.maxRetryCount)) - if err != nil { - log.Err(err).Msg("GetTokenMetadata") - continue - } - for i := range tokens { - s.tasks <- &tokens[i] - } - offset += len(tokens) - end = len(tokens) < 100 - } } // Close - @@ -79,10 +64,26 @@ func (s *TokenService) manager(ctx context.Context) { return } + ticker := time.NewTicker(time.Second * 15) + defer ticker.Stop() + for { select { case <-ctx.Done(): return + case <-ticker.C: + if len(s.tasks) > 100 { + continue + } + + tokens, err := s.repo.GetTokenMetadata(s.network, models.StatusNew, 100, 0, int(s.maxRetryCount)) + if err != nil { + log.Err(err).Msg("GetTokenMetadata") + continue + } + for i := range tokens { + s.tasks <- &tokens[i] + } case unresolved := <-s.tasks: s.workers <- struct{}{} s.wg.Add(1) @@ -97,9 +98,7 @@ func (s *TokenService) saver(ctx context.Context) { ticker := time.NewTicker(time.Second * 15) defer ticker.Stop() - bulkSize := 8 - - tokens := make([]*models.TokenMetadata, 0, bulkSize) + tokens := make([]*models.TokenMetadata, 0) for { select { case <-ctx.Done(): @@ -107,23 +106,19 @@ func (s *TokenService) saver(ctx context.Context) { case <-ticker.C: if err := s.repo.UpdateTokenMetadata(ctx, tokens); err != nil { log.Err(err).Msg("UpdateTokenMetadata") - return + continue } - tokens = make([]*models.TokenMetadata, 0, bulkSize) + tokens = nil case token := <-s.result: - if token.Status != models.StatusApplied && int64(token.RetryCount) < s.maxRetryCount { - s.tasks <- token - } - tokens = append(tokens, token) - if len(tokens) == bulkSize { + if len(tokens) == 8 { if err := s.repo.UpdateTokenMetadata(ctx, tokens); err != nil { log.Err(err).Msg("UpdateTokenMetadata") - return + continue } - tokens = make([]*models.TokenMetadata, 0, bulkSize) + tokens = nil ticker.Reset(time.Second * 15) } } diff --git a/cmd/metadata/thumbnail/service.go b/cmd/metadata/thumbnail/service.go index e3e3c44..c624847 100644 --- a/cmd/metadata/thumbnail/service.go +++ b/cmd/metadata/thumbnail/service.go @@ -15,9 +15,9 @@ import ( "time" "github.com/dipdup-net/go-lib/prometheus" - "github.com/dipdup-net/metadata/cmd/metadata/helpers" "github.com/dipdup-net/metadata/cmd/metadata/models" "github.com/dipdup-net/metadata/cmd/metadata/storage" + "github.com/dipdup-net/metadata/internal/ipfs" "github.com/disintegration/imaging" "github.com/pkg/errors" "github.com/rs/zerolog/log" @@ -224,14 +224,14 @@ func (s *Service) fallback(ctx context.Context, link, filename string) error { func (s *Service) resolve(ctx context.Context, link, mime, filename string) error { switch { case strings.HasPrefix(link, "ipfs://"): - hash, err := helpers.IPFSHash(link) + hash, err := ipfs.Hash(link) if err != nil { return err } - gateways := helpers.ShuffleGateways(s.gateways) + gateways := ipfs.ShuffleGateways(s.gateways) for _, gateway := range gateways { - link := helpers.IPFSLink(gateway, hash) + link := ipfs.Link(gateway, hash) if err := s.processLink(ctx, link, mime, filename); err != nil { log.Err(err).Fields(map[string]interface{}{ "link": link, diff --git a/cmd/metadata/token.go b/cmd/metadata/token.go index a18b75c..38c5ba0 100644 --- a/cmd/metadata/token.go +++ b/cmd/metadata/token.go @@ -121,7 +121,7 @@ func (indexer *Indexer) resolveTokenMetadata(ctx context.Context, tm *models.Tok indexer.logTokenMetadata(*tm, "Trying to resolve", "info") tm.RetryCount += 1 - data, err := indexer.resolver.Resolve(ctx, tm.Network, tm.Contract, tm.Link) + resolved, err := indexer.resolver.Resolve(ctx, tm.Network, tm.Contract, tm.Link) if err != nil { if e, ok := err.(resolver.ResolvingError); ok { indexer.incrementErrorCounter(e) @@ -135,29 +135,29 @@ func (indexer *Indexer) resolveTokenMetadata(ctx context.Context, tm *models.Tok indexer.logTokenMetadata(*tm, "Failed", "warn") } } else { - metadata, err := mergeTokenMetadata(tm.Metadata, data) + metadata, err := mergeTokenMetadata(tm.Metadata, resolved.Data) if err != nil { return err } - escaped := helpers.Escape(data) + tm.Metadata = helpers.Escape(metadata) if utf8.Valid(metadata) { tm.Status = models.StatusApplied - - var dst bytes.Buffer - if err := stdJSON.Compact(&dst, escaped); err != nil { - tm.Metadata = escaped - } else { - tm.Metadata = dst.Bytes() - } } else { tm.Status = models.StatusFailed - tm.Metadata = escaped } } indexer.incrementCounter("token", tm.Status) tm.UpdateID = indexer.tokenActionsCounter.Increment() + + if resolved.By == resolver.ResolverTypeIPFS && tm.Status == models.StatusApplied { + return indexer.db.SaveIPFSLink(models.IPFSLink{ + Link: tm.Link, + Node: resolved.Node, + Data: resolved.Data, + }) + } return nil } @@ -184,5 +184,11 @@ func mergeTokenMetadata(src, got []byte) ([]byte, error) { srcMap[key] = value } } - return json.Marshal(srcMap) + data, err := json.Marshal(srcMap) + if err != nil { + return nil, err + } + var dst bytes.Buffer + err = stdJSON.Compact(&dst, data) + return dst.Bytes(), err } diff --git a/internal/ipfs/data.go b/internal/ipfs/data.go new file mode 100644 index 0000000..75fb020 --- /dev/null +++ b/internal/ipfs/data.go @@ -0,0 +1,7 @@ +package ipfs + +// Data - +type Data struct { + Raw []byte + Node string +} diff --git a/internal/ipfs/errors.go b/internal/ipfs/errors.go new file mode 100644 index 0000000..29ac20f --- /dev/null +++ b/internal/ipfs/errors.go @@ -0,0 +1,12 @@ +package ipfs + +import "errors" + +// Errors +var ( + ErrInvalidURI = errors.New("invalid URI") + ErrEmptyIPFSGatewayList = errors.New("empty IPFS gateway list") + ErrHTTPRequest = errors.New("HTTP request error") + ErrJSONDecoding = errors.New("JSON decoding error") + ErrNoIPFSResponse = errors.New("can't load document from IPFS") +) diff --git a/cmd/metadata/helpers/ipfs.go b/internal/ipfs/functions.go similarity index 76% rename from cmd/metadata/helpers/ipfs.go rename to internal/ipfs/functions.go index 642d9d4..49bad36 100644 --- a/cmd/metadata/helpers/ipfs.go +++ b/internal/ipfs/functions.go @@ -1,4 +1,4 @@ -package helpers +package ipfs import ( "fmt" @@ -11,9 +11,9 @@ import ( "github.com/pkg/errors" ) -// IPFSHash - separate IPFS hash from link -func IPFSHash(link string) (string, error) { - hash := FindAllIPFSLinks([]byte(link)) +// Hash - separate IPFS hash from link +func Hash(link string) (string, error) { + hash := FindAllLinks([]byte(link)) if len(hash) != 1 { return "", errors.Errorf("invalid IPFS link: %s", link) } @@ -21,20 +21,20 @@ func IPFSHash(link string) (string, error) { return hash[0], err } -// IPFSLink - get gateway link -func IPFSLink(gateway, hash string) string { +// Link - get gateway link +func Link(gateway, hash string) string { return fmt.Sprintf("%s/ipfs/%s", gateway, hash) } -// IPFSPath - get path without protocol -func IPFSPath(link string) string { +// Path - get path without protocol +func Path(link string) string { return strings.TrimPrefix(link, "ipfs://") } var ipfsURL = regexp.MustCompile(`ipfs:\/\/(?P(baf[123456789ABCDEFGHJKLMNPQRSTUVWXYZabcdefghijkmnopqrstuvwxyz]{56})|Qm[123456789ABCDEFGHJKLMNPQRSTUVWXYZabcdefghijkmnopqrstuvwxyz]{44})`) -// FindAllIPFSLinks - -func FindAllIPFSLinks(data []byte) []string { +// FindAllLinks - +func FindAllLinks(data []byte) []string { matches := ipfsURL.FindAllSubmatch(data, -1) if len(matches) == 0 { return nil diff --git a/internal/ipfs/pool.go b/internal/ipfs/pool.go new file mode 100644 index 0000000..4a24812 --- /dev/null +++ b/internal/ipfs/pool.go @@ -0,0 +1,101 @@ +package ipfs + +import ( + "context" + "io" + "io/ioutil" + "math/rand" + "net/http" + "net/url" + "time" + + "github.com/pkg/errors" +) + +// Pool - +type Pool struct { + gateways []string + limit int64 +} + +// NewPool - +func NewPool(gateways []string, limit int64) (*Pool, error) { + if len(gateways) == 0 { + return nil, ErrEmptyIPFSGatewayList + } + + for i := range gateways { + if _, err := url.Parse(gateways[i]); err != nil { + return nil, errors.Wrap(ErrInvalidURI, gateways[i]) + } + } + return &Pool{ + gateways: gateways, + limit: limit, + }, nil +} + +// Get - returns result if one of node returns it +func (pool *Pool) Get(ctx context.Context, link string) (Data, error) { + for _, node := range ShuffleGateways(pool.gateways) { + if data, err := pool.request(ctx, link, node); err == nil { + return Data{ + Raw: data, + Node: node, + }, err + } + } + return Data{}, ErrNoIPFSResponse +} + +// GetFromRandomGateway - returns result if random node returns it +func (pool *Pool) GetFromRandomGateway(ctx context.Context, link string) (Data, error) { + rand.Seed(time.Now().UnixNano()) + index := rand.Intn(len(pool.gateways)) + data, err := pool.request(ctx, link, pool.gateways[index]) + if err != nil { + return Data{}, err + } + return Data{ + Raw: data, + Node: pool.gateways[index], + }, nil +} + +// GetFromNode - returns result if `node` returns it +func (pool *Pool) GetFromNode(ctx context.Context, link, node string) (Data, error) { + data, err := pool.request(ctx, link, node) + if err != nil { + return Data{}, err + } + return Data{ + Raw: data, + Node: node, + }, nil +} + +func (pool *Pool) request(ctx context.Context, link, node string) ([]byte, error) { + path := Path(link) + gatewayURL := Link(node, path) + + if _, err := url.ParseRequestURI(gatewayURL); err != nil { + return nil, errors.Wrap(ErrInvalidURI, gatewayURL) + } + req, err := http.NewRequestWithContext(ctx, http.MethodGet, gatewayURL, nil) + if err != nil { + return nil, err + } + + resp, err := http.DefaultClient.Do(req) + if err != nil { + return nil, errors.Wrap(ErrHTTPRequest, err.Error()) + } + defer resp.Body.Close() + + switch resp.StatusCode { + case http.StatusOK: + return ioutil.ReadAll(io.LimitReader(resp.Body, pool.limit)) + default: + return nil, errors.Errorf("invalid status: %s", resp.Status) + } +} From 659e65ca03b9a08a20a97448a142f8c545451f17 Mon Sep 17 00:00:00 2001 From: Artem Date: Mon, 10 Jan 2022 15:35:17 +0300 Subject: [PATCH 2/5] Dockerfile --- build/metadata/Dockerfile | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/build/metadata/Dockerfile b/build/metadata/Dockerfile index cec5a6a..23a6ea5 100644 --- a/build/metadata/Dockerfile +++ b/build/metadata/Dockerfile @@ -17,9 +17,10 @@ WORKDIR $GOPATH/src/github.com/dipdup-net/metadata RUN go mod download COPY cmd/metadata cmd/metadata +COPY internal internal WORKDIR $GOPATH/src/github.com/dipdup-net/metadata/cmd/metadata/ -RUN go build -a -installsuffix cgo -o /go/bin/dipdup-metadata . +RUN go build -a -o /go/bin/dipdup-metadata . # --------------------------------------------------------------------- # The second stage container, for running the application From 548a5d210f7682c720bab71a29ba3d18d1431e00 Mon Sep 17 00:00:00 2001 From: Artem Date: Mon, 10 Jan 2022 18:59:18 +0300 Subject: [PATCH 3/5] Fix: memory leak --- cmd/metadata/contract.go | 9 +++-- cmd/metadata/resolver/ipfs.go | 2 +- cmd/metadata/service/contract.go | 44 +++++++++++++----------- cmd/metadata/service/token.go | 57 ++++++++++++++++++-------------- internal/ipfs/pool.go | 8 ++++- 5 files changed, 69 insertions(+), 51 deletions(-) diff --git a/cmd/metadata/contract.go b/cmd/metadata/contract.go index 2b190ce..5e1111d 100644 --- a/cmd/metadata/contract.go +++ b/cmd/metadata/contract.go @@ -24,16 +24,15 @@ func (indexer *Indexer) processContractMetadata(update api.BigMapUpdate) (*model return nil, err } - metadata := models.ContractMetadata{ + indexer.incrementCounter("contract", models.StatusNew) + + return &models.ContractMetadata{ Network: indexer.network, Contract: update.Contract.Address, Status: models.StatusNew, Link: string(link), UpdateID: indexer.contractActionsCounter.Increment(), - } - indexer.incrementCounter("contract", metadata.Status) - - return &metadata, nil + }, nil } func (indexer *Indexer) logContractMetadata(cm models.ContractMetadata, str, level string) { diff --git a/cmd/metadata/resolver/ipfs.go b/cmd/metadata/resolver/ipfs.go index f9f67c7..aaf5e74 100644 --- a/cmd/metadata/resolver/ipfs.go +++ b/cmd/metadata/resolver/ipfs.go @@ -56,7 +56,7 @@ func NewIPFS(gateways []string, opts ...IpfsOption) (Ipfs, error) { s := Ipfs{ pinning: make([]*shell.Shell, 0), pool: pool, - cache: ccache.New(ccache.Configure().MaxSize(1000)), + cache: ccache.New(ccache.Configure().MaxSize(1000).ItemsToPrune(100)), } for i := range opts { diff --git a/cmd/metadata/service/contract.go b/cmd/metadata/service/contract.go index 0396eee..97cf649 100644 --- a/cmd/metadata/service/contract.go +++ b/cmd/metadata/service/contract.go @@ -17,7 +17,6 @@ type ContractService struct { handler func(ctx context.Context, contract *models.ContractMetadata) error tasks chan *models.ContractMetadata result chan *models.ContractMetadata - workers chan struct{} wg sync.WaitGroup } @@ -29,7 +28,6 @@ func NewContractService(db models.Database, handler func(context.Context, *model handler: handler, tasks: make(chan *models.ContractMetadata, 512), result: make(chan *models.ContractMetadata, 16), - workers: make(chan struct{}, 5), network: network, } } @@ -47,7 +45,6 @@ func (s *ContractService) Start(ctx context.Context) { func (s *ContractService) Close() error { s.wg.Wait() - close(s.workers) return nil } @@ -67,10 +64,16 @@ func (s *ContractService) manager(ctx context.Context) { ticker := time.NewTicker(time.Second * 15) defer ticker.Stop() + for i := 0; i < 5; i++ { + s.wg.Add(1) + go s.worker(ctx) + } + for { select { case <-ctx.Done(): return + case <-ticker.C: if len(s.tasks) > 100 { continue @@ -83,11 +86,6 @@ func (s *ContractService) manager(ctx context.Context) { for i := range contracts { s.tasks <- &contracts[i] } - - case unresolved := <-s.tasks: - s.workers <- struct{}{} - s.wg.Add(1) - go s.worker(ctx, unresolved) } } } @@ -105,6 +103,9 @@ func (s *ContractService) saver(ctx context.Context) { return case <-ticker.C: + if len(contracts) == 0 { + continue + } if err := s.db.UpdateContractMetadata(ctx, contracts); err != nil { log.Err(err).Msg("UpdateContractMetadata") continue @@ -127,19 +128,24 @@ func (s *ContractService) saver(ctx context.Context) { } -func (s *ContractService) worker(ctx context.Context, contract *models.ContractMetadata) { - defer func() { - s.wg.Done() - <-s.workers - }() +func (s *ContractService) worker(ctx context.Context) { + defer s.wg.Done() + + for { + select { + case <-ctx.Done(): + return + case unresolved := <-s.tasks: + resolveCtx, cancel := context.WithTimeout(ctx, 30*time.Second) + defer cancel() - resolveCtx, cancel := context.WithTimeout(ctx, 30*time.Second) - defer cancel() + if err := s.handler(resolveCtx, unresolved); err != nil { + log.Err(err).Msg("resolve contract") + continue + } - if err := s.handler(resolveCtx, contract); err != nil { - log.Err(err).Msg("resolve contract") - return + s.result <- unresolved + } } - s.result <- contract } diff --git a/cmd/metadata/service/token.go b/cmd/metadata/service/token.go index 43b23f9..95bff23 100644 --- a/cmd/metadata/service/token.go +++ b/cmd/metadata/service/token.go @@ -17,7 +17,6 @@ type TokenService struct { handler func(ctx context.Context, token *models.TokenMetadata) error tasks chan *models.TokenMetadata result chan *models.TokenMetadata - workers chan struct{} wg sync.WaitGroup } @@ -30,7 +29,6 @@ func NewTokenService(repo models.TokenRepository, handler func(context.Context, handler: handler, tasks: make(chan *models.TokenMetadata, 512), result: make(chan *models.TokenMetadata, 16), - workers: make(chan struct{}, 5), } } @@ -47,7 +45,6 @@ func (s *TokenService) Start(ctx context.Context) { func (s *TokenService) Close() error { s.wg.Wait() - close(s.workers) return nil } @@ -67,6 +64,11 @@ func (s *TokenService) manager(ctx context.Context) { ticker := time.NewTicker(time.Second * 15) defer ticker.Stop() + for i := 0; i < 5; i++ { + s.wg.Add(1) + go s.worker(ctx) + } + for { select { case <-ctx.Done(): @@ -84,10 +86,6 @@ func (s *TokenService) manager(ctx context.Context) { for i := range tokens { s.tasks <- &tokens[i] } - case unresolved := <-s.tasks: - s.workers <- struct{}{} - s.wg.Add(1) - go s.worker(ctx, unresolved) } } } @@ -103,12 +101,6 @@ func (s *TokenService) saver(ctx context.Context) { select { case <-ctx.Done(): return - case <-ticker.C: - if err := s.repo.UpdateTokenMetadata(ctx, tokens); err != nil { - log.Err(err).Msg("UpdateTokenMetadata") - continue - } - tokens = nil case token := <-s.result: tokens = append(tokens, token) @@ -121,24 +113,39 @@ func (s *TokenService) saver(ctx context.Context) { tokens = nil ticker.Reset(time.Second * 15) } + + case <-ticker.C: + if len(tokens) == 0 { + continue + } + if err := s.repo.UpdateTokenMetadata(ctx, tokens); err != nil { + log.Err(err).Msg("UpdateTokenMetadata") + continue + } + tokens = nil + } } } -func (s *TokenService) worker(ctx context.Context, token *models.TokenMetadata) { - defer func() { - s.wg.Done() - <-s.workers - }() +func (s *TokenService) worker(ctx context.Context) { + defer s.wg.Done() - resolveCtx, cancel := context.WithTimeout(ctx, 30*time.Second) - defer cancel() + for { + select { + case <-ctx.Done(): + return + case unresolved := <-s.tasks: + resolveCtx, cancel := context.WithTimeout(ctx, 30*time.Second) + defer cancel() - if err := s.handler(resolveCtx, token); err != nil { - log.Err(err).Msg("resolve token") - return - } + if err := s.handler(resolveCtx, unresolved); err != nil { + log.Err(err).Msg("resolve token") + return + } - s.result <- token + s.result <- unresolved + } + } } diff --git a/internal/ipfs/pool.go b/internal/ipfs/pool.go index 4a24812..2749643 100644 --- a/internal/ipfs/pool.go +++ b/internal/ipfs/pool.go @@ -16,6 +16,7 @@ import ( type Pool struct { gateways []string limit int64 + client *http.Client } // NewPool - @@ -32,6 +33,11 @@ func NewPool(gateways []string, limit int64) (*Pool, error) { return &Pool{ gateways: gateways, limit: limit, + client: &http.Client{ + Transport: &http.Transport{ + DisableKeepAlives: true, + }, + }, }, nil } @@ -86,7 +92,7 @@ func (pool *Pool) request(ctx context.Context, link, node string) ([]byte, error return nil, err } - resp, err := http.DefaultClient.Do(req) + resp, err := pool.client.Do(req) if err != nil { return nil, errors.Wrap(ErrHTTPRequest, err.Error()) } From ac85cb54c3c100b0450992d4db2b55d732ad6dd5 Mon Sep 17 00:00:00 2001 From: Artem Date: Mon, 10 Jan 2022 19:27:14 +0300 Subject: [PATCH 4/5] Fix: remove pprof --- cmd/metadata/main.go | 7 ------- 1 file changed, 7 deletions(-) diff --git a/cmd/metadata/main.go b/cmd/metadata/main.go index 58e10aa..6c2da14 100644 --- a/cmd/metadata/main.go +++ b/cmd/metadata/main.go @@ -18,9 +18,6 @@ import ( "github.com/dipdup-net/go-lib/prometheus" "github.com/dipdup-net/metadata/cmd/metadata/config" "github.com/dipdup-net/metadata/cmd/metadata/models" - - "net/http" - _ "net/http/pprof" ) const ( @@ -40,10 +37,6 @@ func main() { TimeFormat: "2006-01-02 15:04:05", }).Level(zerolog.InfoLevel) - go func() { - log.Print(http.ListenAndServe("localhost:6060", nil)) - }() - args := cmdline.Parse() if args.Help { return From f3c653d4c8ac29b901acd7f7461625f0a2fee23c Mon Sep 17 00:00:00 2001 From: Artem Date: Tue, 11 Jan 2022 14:42:09 +0300 Subject: [PATCH 5/5] Upgrade: go-lib --- go.mod | 2 +- go.sum | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/go.mod b/go.mod index 2184e4f..e24864e 100644 --- a/go.mod +++ b/go.mod @@ -6,7 +6,7 @@ require ( github.com/aws/aws-sdk-go v1.42.20 github.com/btcsuite/btcd v0.22.0-beta // indirect github.com/cenkalti/backoff v2.2.1+incompatible - github.com/dipdup-net/go-lib v0.1.47 + github.com/dipdup-net/go-lib v0.1.49 github.com/disintegration/imaging v1.6.2 github.com/elastic/go-elasticsearch/v8 v8.0.0-20211123103400-5f8a17a2322f github.com/go-pg/pg/v10 v10.10.6 diff --git a/go.sum b/go.sum index 652e130..90d987e 100644 --- a/go.sum +++ b/go.sum @@ -52,8 +52,8 @@ github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSs github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/decred/dcrd/lru v1.0.0/go.mod h1:mxKOwFd7lFjN2GZYsiz/ecgqR6kkYAl+0pz0tEMk218= -github.com/dipdup-net/go-lib v0.1.47 h1:ayXeE1SVq77xCeuwE0s13IDY3wUeoCoukO/ozOKmZ/w= -github.com/dipdup-net/go-lib v0.1.47/go.mod h1:ZOAyo2sqNLCCVdvHvPbiWfgImvmrMW7/sdQiSsGrD44= +github.com/dipdup-net/go-lib v0.1.49 h1:iSw1zkIkHT9VR6TQFHZ2Iq+9kc/7nJkZInUCmG2LVqE= +github.com/dipdup-net/go-lib v0.1.49/go.mod h1:ZOAyo2sqNLCCVdvHvPbiWfgImvmrMW7/sdQiSsGrD44= github.com/disintegration/imaging v1.6.2 h1:w1LecBlG2Lnp8B3jk5zSuNqd7b4DXhcjwek1ei82L+c= github.com/disintegration/imaging v1.6.2/go.mod h1:44/5580QXChDfwIclfc/PCwrr44amcmDAg8hxG0Ewe4= github.com/ebellocchia/go-base58 v0.1.0/go.mod h1:RHE/6C6Ru6YAH9Tc+A9eHQ6ZKEooLC0jw+YLnpt3CAU=