Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Mx 16271 indexing incoming tokens #325

Open
wants to merge 18 commits into
base: feat/sovereign
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 13 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
73 changes: 73 additions & 0 deletions client/disabled/elasticClient.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
package disabled

import (
"bytes"
"context"
)

type elasticClient struct{}

// NewDisabledElasticClient -
func NewDisabledElasticClient() *elasticClient {
return &elasticClient{}
}

// DoBulkRequest -
func (ec *elasticClient) DoBulkRequest(_ context.Context, _ *bytes.Buffer, _ string) error {
return nil
}

// DoQueryRemove -
func (ec *elasticClient) DoQueryRemove(_ context.Context, _ string, _ *bytes.Buffer) error {
return nil
}

// DoMultiGet -
func (ec *elasticClient) DoMultiGet(_ context.Context, _ []string, _ string, _ bool, _ interface{}) error {
return nil
}

// DoScrollRequest -
func (ec *elasticClient) DoScrollRequest(_ context.Context, _ string, _ []byte, _ bool, _ func(responseBytes []byte) error) error {
return nil
}

// DoCountRequest -
func (ec *elasticClient) DoCountRequest(_ context.Context, _ string, _ []byte) (uint64, error) {
return 0, nil
}

// UpdateByQuery -
func (ec *elasticClient) UpdateByQuery(_ context.Context, _ string, _ *bytes.Buffer) error {
return nil
}

// PutMappings -
func (ec *elasticClient) PutMappings(_ string, _ *bytes.Buffer) error {
return nil
}

// CheckAndCreateIndex -
func (ec *elasticClient) CheckAndCreateIndex(_ string) error {
return nil
}

// CheckAndCreateAlias -
func (ec *elasticClient) CheckAndCreateAlias(_ string, _ string) error {
return nil
}

// CheckAndCreateTemplate -
func (ec *elasticClient) CheckAndCreateTemplate(_ string, _ *bytes.Buffer) error {
return nil
}

// CheckAndCreatePolicy -
func (ec *elasticClient) CheckAndCreatePolicy(_ string, _ *bytes.Buffer) error {
return nil
}

// IsInterfaceNil - returns true if there is no value under the interface
func (ec *elasticClient) IsInterfaceNil() bool {
return ec == nil
}
31 changes: 31 additions & 0 deletions client/disabled/elasticClient_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
package disabled

import (
"bytes"
"context"
"testing"

"github.com/multiversx/mx-chain-core-go/core/check"
"github.com/stretchr/testify/require"
)

func TestDisabledElasticClient_MethodsShouldNotPanic(t *testing.T) {
t.Parallel()

ec := NewDisabledElasticClient()
require.False(t, check.IfNil(ec))

require.NotPanics(t, func() {
_ = ec.DoBulkRequest(context.Background(), new(bytes.Buffer), "")
_ = ec.DoQueryRemove(context.Background(), "", new(bytes.Buffer))
_ = ec.DoMultiGet(context.Background(), make([]string, 0), "", true, nil)
_ = ec.DoScrollRequest(context.Background(), "", []byte(""), true, nil)
_, _ = ec.DoCountRequest(context.Background(), "", []byte(""))
_ = ec.UpdateByQuery(context.Background(), "", new(bytes.Buffer))
_ = ec.PutMappings("", new(bytes.Buffer))
_ = ec.CheckAndCreateIndex("")
_ = ec.CheckAndCreateAlias("", "")
_ = ec.CheckAndCreateTemplate("", new(bytes.Buffer))
_ = ec.CheckAndCreatePolicy("", new(bytes.Buffer))
})
}
6 changes: 6 additions & 0 deletions cmd/elasticindexer/config/prefs.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,3 +23,9 @@
username = ""
password = ""
bulk-request-max-size-in-bytes = 4194304 # 4MB

[config.main-elastic-cluster]
mariusmihaic marked this conversation as resolved.
Show resolved Hide resolved
enabled = true
url = "http://localhost:9201"
username = ""
password = ""
9 changes: 8 additions & 1 deletion config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,8 @@ type Config struct {
LogsPath string `toml:"logs-path"`
} `toml:"logs"`
} `toml:"config"`
Sovereign bool
Sovereign bool
ESDTPrefix string
}

// ClusterConfig will hold the config for the Elasticsearch cluster
Expand All @@ -52,6 +53,12 @@ type ClusterConfig struct {
Password string `toml:"password"`
BulkRequestMaxSizeInBytes int `toml:"bulk-request-max-size-in-bytes"`
} `toml:"elastic-cluster"`
MainChainCluster struct {
Enabled bool `toml:"enabled"`
URL string `toml:"url"`
UserName string `toml:"username"`
Password string `toml:"password"`
} `toml:"main-elastic-cluster"`
} `toml:"config"`
}

Expand Down
12 changes: 12 additions & 0 deletions data/tokens.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,18 @@ type SourceToken struct {
CurrentOwner string `json:"currentOwner"`
}

// ResponseTokenInfo is the structure for the tokens info response
type ResponseTokenInfo struct {
Docs []ResponseTokenInfoDB `json:"docs"`
}

// ResponseTokenInfoDB is the structure for the token info response
type ResponseTokenInfoDB struct {
Found bool `json:"found"`
ID string `json:"_id"`
Source TokenInfo `json:"_source"`
mariusmihaic marked this conversation as resolved.
Show resolved Hide resolved
}

// TokenInfo is a structure that is needed to store information about a token
type TokenInfo struct {
Name string `json:"name,omitempty"`
Expand Down
16 changes: 16 additions & 0 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,22 @@ services:
ports:
- "9200:9200"
- "9300:9300"
main-elasticsearch:
mariusmihaic marked this conversation as resolved.
Show resolved Hide resolved
container_name: es-container2
image: docker.elastic.co/elasticsearch/elasticsearch:7.16.1
environment:
- "discovery.type=single-node"
- "xpack.security.enabled=false"
- "ES_JAVA_OPTS=-Xms512m -Xmx512m"
ulimits:
memlock:
soft: -1
hard: -1
networks:
- es-net
ports:
- "9201:9200"
- "9301:9300"
kibana:
container_name: kb-container
image: docker.elastic.co/kibana/kibana:7.16.1
Expand Down
4 changes: 3 additions & 1 deletion factory/runType/interface.go
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
package runType

import (
"github.com/multiversx/mx-chain-es-indexer-go/process/elasticproc"
"github.com/multiversx/mx-chain-es-indexer-go/process/elasticproc/transactions"
)

// RunTypeComponentsCreator is the interface for creating run type components
type RunTypeComponentsCreator interface {
Create() *runTypeComponents
Create() (*runTypeComponents, error)
IsInterfaceNil() bool
}

Expand All @@ -28,6 +29,7 @@ type RunTypeComponentsHandler interface {
type RunTypeComponentsHolder interface {
TxHashExtractorCreator() transactions.TxHashExtractor
RewardTxDataCreator() transactions.RewardTxDataHandler
IndexTokensHandlerCreator() elasticproc.IndexTokensHandler
Create() error
Close() error
CheckSubcomponents() error
Expand Down
6 changes: 4 additions & 2 deletions factory/runType/runTypeComponents.go
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
package runType

import (
"github.com/multiversx/mx-chain-es-indexer-go/process/elasticproc"
"github.com/multiversx/mx-chain-es-indexer-go/process/elasticproc/transactions"
)

type runTypeComponents struct {
txHashExtractor transactions.TxHashExtractor
rewardTxData transactions.RewardTxDataHandler
txHashExtractor transactions.TxHashExtractor
rewardTxData transactions.RewardTxDataHandler
indexTokensHandler elasticproc.IndexTokensHandler
}

// Close does nothing
Expand Down
10 changes: 6 additions & 4 deletions factory/runType/runTypeComponentsFactory.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package runType

import (
"github.com/multiversx/mx-chain-es-indexer-go/process/elasticproc/tokens"
"github.com/multiversx/mx-chain-es-indexer-go/process/elasticproc/transactions"
)

Expand All @@ -12,11 +13,12 @@ func NewRunTypeComponentsFactory() *runTypeComponentsFactory {
}

// Create will create the run type components
func (rtcf *runTypeComponentsFactory) Create() *runTypeComponents {
func (rtcf *runTypeComponentsFactory) Create() (*runTypeComponents, error) {
return &runTypeComponents{
txHashExtractor: transactions.NewTxHashExtractor(),
rewardTxData: transactions.NewRewardTxData(),
}
txHashExtractor: transactions.NewTxHashExtractor(),
rewardTxData: transactions.NewRewardTxData(),
indexTokensHandler: tokens.NewDisabledIndexTokensHandler(),
}, nil
}

// IsInterfaceNil returns true if there is no value under the interface
Expand Down
21 changes: 20 additions & 1 deletion factory/runType/runTypeComponentsHandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (

"github.com/multiversx/mx-chain-core-go/core/check"

"github.com/multiversx/mx-chain-es-indexer-go/process/elasticproc"
"github.com/multiversx/mx-chain-es-indexer-go/process/elasticproc/transactions"
)

Expand Down Expand Up @@ -34,7 +35,10 @@ func NewManagedRunTypeComponents(rtc RunTypeComponentsCreator) (*managedRunTypeC

// Create will create the managed components
func (mrtc *managedRunTypeComponents) Create() error {
rtc := mrtc.factory.Create()
rtc, err := mrtc.factory.Create()
if err != nil {
return err
}

mrtc.mutRunTypeCoreComponents.Lock()
mrtc.runTypeComponents = rtc
Expand Down Expand Up @@ -75,6 +79,9 @@ func (mrtc *managedRunTypeComponents) CheckSubcomponents() error {
if check.IfNil(mrtc.rewardTxData) {
return transactions.ErrNilRewardTxDataHandler
}
if check.IfNil(mrtc.indexTokensHandler) {
return transactions.ErrNilIndexTokensHandler
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe you should move transactions.ErrNilIndexTokensHandler in another package, because it is not relayed with the transactions package

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

moved to dataIndexer

}
return nil
}

Expand Down Expand Up @@ -102,6 +109,18 @@ func (mrtc *managedRunTypeComponents) RewardTxDataCreator() transactions.RewardT
return mrtc.runTypeComponents.rewardTxData
}

// IndexTokensHandlerCreator returns the index tokens handler
func (mrtc *managedRunTypeComponents) IndexTokensHandlerCreator() elasticproc.IndexTokensHandler {
mrtc.mutRunTypeCoreComponents.Lock()
defer mrtc.mutRunTypeCoreComponents.Unlock()

if check.IfNil(mrtc.runTypeComponents) {
return nil
}

return mrtc.runTypeComponents.indexTokensHandler
}

// IsInterfaceNil returns true if the interface is nil
func (mrtc *managedRunTypeComponents) IsInterfaceNil() bool {
return mrtc == nil
Expand Down
6 changes: 4 additions & 2 deletions factory/runType/runTypeComponents_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,9 @@ func TestRunTypeComponentsFactory_Create(t *testing.T) {
rtcf := NewRunTypeComponentsFactory()
require.NotNil(t, rtcf)

rtc := rtcf.Create()
rtc, err := rtcf.Create()
require.NotNil(t, rtc)
require.NoError(t, err)
}

func TestRunTypeComponentsFactory_Close(t *testing.T) {
Expand All @@ -31,8 +32,9 @@ func TestRunTypeComponentsFactory_Close(t *testing.T) {
rtcf := NewRunTypeComponentsFactory()
require.NotNil(t, rtcf)

rtc := rtcf.Create()
rtc, err := rtcf.Create()
require.NotNil(t, rtc)
require.NoError(t, err)

require.NoError(t, rtc.Close())
}
61 changes: 55 additions & 6 deletions factory/runType/sovereignRunTypeComponentsFactory.go
Original file line number Diff line number Diff line change
@@ -1,24 +1,73 @@
package runType

import (
"math"
"net/http"
"time"

"github.com/elastic/go-elasticsearch/v7"

"github.com/multiversx/mx-chain-es-indexer-go/client"
"github.com/multiversx/mx-chain-es-indexer-go/client/disabled"
"github.com/multiversx/mx-chain-es-indexer-go/client/logging"
"github.com/multiversx/mx-chain-es-indexer-go/process/elasticproc"
"github.com/multiversx/mx-chain-es-indexer-go/process/elasticproc/factory"
"github.com/multiversx/mx-chain-es-indexer-go/process/elasticproc/tokens"
"github.com/multiversx/mx-chain-es-indexer-go/process/elasticproc/transactions"
)

type sovereignRunTypeComponentsFactory struct{}
type sovereignRunTypeComponentsFactory struct {
mainChainElastic factory.ElasticConfig
esdtPrefix string
}

// NewSovereignRunTypeComponentsFactory will return a new instance of sovereign run type components factory
func NewSovereignRunTypeComponentsFactory() *sovereignRunTypeComponentsFactory {
return &sovereignRunTypeComponentsFactory{}
func NewSovereignRunTypeComponentsFactory(mainChainElastic factory.ElasticConfig, esdtPrefix string) *sovereignRunTypeComponentsFactory {
mariusmihaic marked this conversation as resolved.
Show resolved Hide resolved
return &sovereignRunTypeComponentsFactory{
mainChainElastic: mainChainElastic,
esdtPrefix: esdtPrefix,
}
}

// Create will create the run type components
func (srtcf *sovereignRunTypeComponentsFactory) Create() *runTypeComponents {
func (srtcf *sovereignRunTypeComponentsFactory) Create() (*runTypeComponents, error) {
mainChainElasticClient, err := createMainChainElasticClient(srtcf.mainChainElastic)
if err != nil {
return nil, err
}

sovIndexTokensHandler, err := tokens.NewSovereignIndexTokensHandler(srtcf.mainChainElastic.Enabled, mainChainElasticClient, srtcf.esdtPrefix)
if err != nil {
return nil, err
}

return &runTypeComponents{
txHashExtractor: transactions.NewSovereignTxHashExtractor(),
rewardTxData: transactions.NewSovereignRewardTxData(),
txHashExtractor: transactions.NewSovereignTxHashExtractor(),
rewardTxData: transactions.NewSovereignRewardTxData(),
indexTokensHandler: sovIndexTokensHandler,
}, nil
}

func createMainChainElasticClient(mainChainElastic factory.ElasticConfig) (elasticproc.DatabaseClientHandler, error) {
if mainChainElastic.Enabled {
argsEsClient := elasticsearch.Config{
Addresses: []string{mainChainElastic.Url},
Username: mainChainElastic.UserName,
Password: mainChainElastic.Password,
Logger: &logging.CustomLogger{},
RetryOnStatus: []int{http.StatusConflict},
RetryBackoff: retryBackOff,
}
return client.NewElasticClient(argsEsClient)
} else {
return disabled.NewDisabledElasticClient(), nil
}
}

func retryBackOff(attempt int) time.Duration {
mariusmihaic marked this conversation as resolved.
Show resolved Hide resolved
return time.Duration(math.Exp2(float64(attempt))) * time.Second
}

// IsInterfaceNil returns true if there is no value under the interface
func (srtcf *sovereignRunTypeComponentsFactory) IsInterfaceNil() bool {
return srtcf == nil
Expand Down
Loading