diff --git a/cmd/metadata/indexer.go b/cmd/metadata/indexer.go index 85b90d9..5c7d30b 100644 --- a/cmd/metadata/indexer.go +++ b/cmd/metadata/indexer.go @@ -78,8 +78,8 @@ func NewIndexer(ctx context.Context, network string, indexerConfig *config.Index thumbnail.WithTimeout(settings.Thumbnail.Timeout), ) } - indexer.contracts = service.NewContractService(db, indexer.resolveContractMetadata, int64(settings.MaxRetryCountOnError)) - indexer.tokens = service.NewTokenService(db, indexer.resolveTokenMetadata, int64(settings.MaxRetryCountOnError)) + indexer.contracts = service.NewContractService(db, indexer.resolveContractMetadata, network, int64(settings.MaxRetryCountOnError)) + indexer.tokens = service.NewTokenService(db, indexer.resolveTokenMetadata, network, int64(settings.MaxRetryCountOnError)) return indexer, nil } diff --git a/cmd/metadata/models/contract_metadata.go b/cmd/metadata/models/contract_metadata.go index 7d14d7d..def0629 100644 --- a/cmd/metadata/models/contract_metadata.go +++ b/cmd/metadata/models/contract_metadata.go @@ -42,7 +42,7 @@ func (cm *ContractMetadata) BeforeUpdate(ctx context.Context) (context.Context, // ContractRepository - type ContractRepository interface { - GetContractMetadata(status Status, limit, offset, retryCount int) ([]ContractMetadata, error) + GetContractMetadata(network string, status Status, limit, offset, retryCount int) ([]ContractMetadata, error) UpdateContractMetadata(ctx context.Context, metadata []*ContractMetadata) error SaveContractMetadata(ctx context.Context, metadata []*ContractMetadata) error LastContractUpdateID() (int64, error) diff --git a/cmd/metadata/models/db.go b/cmd/metadata/models/db.go index 31015e0..80f726d 100644 --- a/cmd/metadata/models/db.go +++ b/cmd/metadata/models/db.go @@ -73,8 +73,8 @@ func (d dbLogger) AfterQuery(c context.Context, q *pg.QueryEvent) error { } // GetContractMetadata - -func (db *RelativeDatabase) GetContractMetadata(status Status, limit, offset, retryCount int) (all []ContractMetadata, err error) { - query := db.DB().Model(&all).Where("status = ?", status) +func (db *RelativeDatabase) GetContractMetadata(network string, status Status, limit, offset, retryCount int) (all []ContractMetadata, err error) { + query := db.DB().Model(&all).Where("status = ?", status).Where("network = ?", network) if limit > 0 { query.Limit(limit) } @@ -117,8 +117,8 @@ func (db *RelativeDatabase) LastContractUpdateID() (updateID int64, err error) { } // GetTokenMetadata - -func (db *RelativeDatabase) GetTokenMetadata(status Status, limit, offset, retryCount int) (all []TokenMetadata, err error) { - query := db.DB().Model(&all).Where("status = ?", status) +func (db *RelativeDatabase) GetTokenMetadata(network string, status Status, limit, offset, retryCount int) (all []TokenMetadata, err error) { + query := db.DB().Model(&all).Where("status = ?", status).Where("network = ?", network) if limit > 0 { query.Limit(limit) } diff --git a/cmd/metadata/models/elastic.go b/cmd/metadata/models/elastic.go index 5fa5a7e..d97f2a2 100644 --- a/cmd/metadata/models/elastic.go +++ b/cmd/metadata/models/elastic.go @@ -118,7 +118,8 @@ func (e *Elastic) bulk(buf *bytes.Buffer) error { } // GetContractMetadata - -func (e *Elastic) GetContractMetadata(status Status, limit, offset, retryCount int) ([]ContractMetadata, error) { +// TODO: filter network +func (e *Elastic) GetContractMetadata(network string, status Status, limit, offset, retryCount int) ([]ContractMetadata, error) { hits, err := e.search( fmt.Sprintf(`{"query":{"term":{"status": %d}}}`, status), e.Search.WithIndex(ContractMetadata{}.TableName()), @@ -228,7 +229,8 @@ func (e *Elastic) LastContractUpdateID() (value int64, err error) { } // GetContractMetadata - -func (e *Elastic) GetTokenMetadata(status Status, limit, offset, retryCount int) ([]TokenMetadata, error) { +// TODO: filter network +func (e *Elastic) GetTokenMetadata(network string, status Status, limit, offset, retryCount int) ([]TokenMetadata, error) { hits, err := e.search( fmt.Sprintf(`{"query":{"term":{"status": %d}}}`, status), e.Search.WithIndex(TokenMetadata{}.TableName()), diff --git a/cmd/metadata/models/token_metadata.go b/cmd/metadata/models/token_metadata.go index 05a08fe..d9ac1b6 100644 --- a/cmd/metadata/models/token_metadata.go +++ b/cmd/metadata/models/token_metadata.go @@ -46,7 +46,7 @@ func (tm *TokenMetadata) BeforeUpdate(ctx context.Context) (context.Context, err // TokenRepository - type TokenRepository interface { - GetTokenMetadata(status Status, limit, offset, retryCount int) ([]TokenMetadata, error) + GetTokenMetadata(network string, status Status, limit, offset, retryCount int) ([]TokenMetadata, error) SetImageProcessed(token TokenMetadata) error GetUnprocessedImage(from uint64, limit int) ([]TokenMetadata, error) UpdateTokenMetadata(ctx context.Context, metadata []*TokenMetadata) error diff --git a/cmd/metadata/service/contract.go b/cmd/metadata/service/contract.go index 93d5d81..6bcdd34 100644 --- a/cmd/metadata/service/contract.go +++ b/cmd/metadata/service/contract.go @@ -11,6 +11,7 @@ import ( // ContractService - type ContractService struct { + network string maxRetryCount int64 db models.Database handler func(ctx context.Context, contract *models.ContractMetadata) error @@ -21,7 +22,7 @@ type ContractService struct { } // NewContractService - -func NewContractService(db models.Database, handler func(context.Context, *models.ContractMetadata) error, maxRetryCount int64) *ContractService { +func NewContractService(db models.Database, handler func(context.Context, *models.ContractMetadata) error, network string, maxRetryCount int64) *ContractService { return &ContractService{ maxRetryCount: maxRetryCount, db: db, @@ -29,6 +30,7 @@ func NewContractService(db models.Database, handler func(context.Context, *model tasks: make(chan *models.ContractMetadata, 1024*128), result: make(chan *models.ContractMetadata, 15), workers: make(chan struct{}, 10), + network: network, } } @@ -43,7 +45,7 @@ func (s *ContractService) Start(ctx context.Context) { var offset int var end bool for !end { - contracts, err := s.db.GetContractMetadata(models.StatusNew, 100, offset, int(s.maxRetryCount)) + contracts, err := s.db.GetContractMetadata(s.network, models.StatusNew, 100, offset, int(s.maxRetryCount)) if err != nil { log.Err(err).Msg("GetContractMetadata") continue diff --git a/cmd/metadata/service/token.go b/cmd/metadata/service/token.go index f39788a..d683c81 100644 --- a/cmd/metadata/service/token.go +++ b/cmd/metadata/service/token.go @@ -11,6 +11,7 @@ import ( // TokenService - type TokenService struct { + network string maxRetryCount int64 repo models.TokenRepository handler func(ctx context.Context, token *models.TokenMetadata) error @@ -21,9 +22,10 @@ type TokenService struct { } // NewContractService - -func NewTokenService(repo models.TokenRepository, handler func(context.Context, *models.TokenMetadata) error, maxRetryCount int64) *TokenService { +func NewTokenService(repo models.TokenRepository, handler func(context.Context, *models.TokenMetadata) error, network string, maxRetryCount int64) *TokenService { return &TokenService{ maxRetryCount: maxRetryCount, + network: network, repo: repo, handler: handler, tasks: make(chan *models.TokenMetadata, 1024*128), @@ -43,7 +45,7 @@ func (s *TokenService) Start(ctx context.Context) { var offset int var end bool for !end { - tokens, err := s.repo.GetTokenMetadata(models.StatusNew, 100, offset, int(s.maxRetryCount)) + tokens, err := s.repo.GetTokenMetadata(s.network, models.StatusNew, 100, offset, int(s.maxRetryCount)) if err != nil { log.Err(err).Msg("GetTokenMetadata") continue