Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Mx 16271 indexing incoming tokens #325

Open
wants to merge 15 commits into
base: feat/sovereign
Choose a base branch
from
78 changes: 78 additions & 0 deletions client/disabled/elasticClient.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
package disabled

import (
"bytes"
"context"
)

type elasticClient struct{}

// NewDisabledElasticClient -
func NewDisabledElasticClient() *elasticClient {
return &elasticClient{}
}

// DoBulkRequest -
func (ec *elasticClient) DoBulkRequest(_ context.Context, _ *bytes.Buffer, _ string) error {
return nil
}

// DoQueryRemove -
func (ec *elasticClient) DoQueryRemove(_ context.Context, _ string, _ *bytes.Buffer) error {
return nil
}

// DoMultiGet -
func (ec *elasticClient) DoMultiGet(_ context.Context, _ []string, _ string, _ bool, _ interface{}) error {
return nil
}

// DoScrollRequest -
func (ec *elasticClient) DoScrollRequest(_ context.Context, _ string, _ []byte, _ bool, _ func(responseBytes []byte) error) error {
return nil
}

// DoCountRequest -
func (ec *elasticClient) DoCountRequest(_ context.Context, _ string, _ []byte) (uint64, error) {
return 0, nil
}

// UpdateByQuery -
func (ec *elasticClient) UpdateByQuery(_ context.Context, _ string, _ *bytes.Buffer) error {
return nil
}

// PutMappings -
func (ec *elasticClient) PutMappings(_ string, _ *bytes.Buffer) error {
return nil
}

// CheckAndCreateIndex -
func (ec *elasticClient) CheckAndCreateIndex(_ string) error {
return nil
}

// CheckAndCreateAlias -
func (ec *elasticClient) CheckAndCreateAlias(_ string, _ string) error {
return nil
}

// CheckAndCreateTemplate -
func (ec *elasticClient) CheckAndCreateTemplate(_ string, _ *bytes.Buffer) error {
return nil
}

// CheckAndCreatePolicy -
func (ec *elasticClient) CheckAndCreatePolicy(_ string, _ *bytes.Buffer) error {
return nil
}

// IsEnabled -
func (ec *elasticClient) IsEnabled() bool {
return false
}

// IsInterfaceNil - returns true if there is no value under the interface
func (ec *elasticClient) IsInterfaceNil() bool {
return ec == nil
}
31 changes: 31 additions & 0 deletions client/disabled/elasticClient_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
package disabled

import (
"bytes"
"context"
"testing"

"github.com/multiversx/mx-chain-core-go/core/check"
"github.com/stretchr/testify/require"
)

func TestDisabledElasticClient_MethodsShouldNotPanic(t *testing.T) {
t.Parallel()

ec := NewDisabledElasticClient()
require.False(t, check.IfNil(ec))

require.NotPanics(t, func() {
_ = ec.DoBulkRequest(context.Background(), new(bytes.Buffer), "")
_ = ec.DoQueryRemove(context.Background(), "", new(bytes.Buffer))
_ = ec.DoMultiGet(context.Background(), make([]string, 0), "", true, nil)
_ = ec.DoScrollRequest(context.Background(), "", []byte(""), true, nil)
_, _ = ec.DoCountRequest(context.Background(), "", []byte(""))
_ = ec.UpdateByQuery(context.Background(), "", new(bytes.Buffer))
_ = ec.PutMappings("", new(bytes.Buffer))
_ = ec.CheckAndCreateIndex("")
_ = ec.CheckAndCreateAlias("", "")
_ = ec.CheckAndCreateTemplate("", new(bytes.Buffer))
_ = ec.CheckAndCreatePolicy("", new(bytes.Buffer))
})
}
11 changes: 11 additions & 0 deletions client/elasticClientCommon.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,14 @@ import (
"fmt"
"io"
"io/ioutil"
"math"
"net/http"
"net/url"
"strings"
"time"

"github.com/elastic/go-elasticsearch/v7/esapi"

"github.com/multiversx/mx-chain-es-indexer-go/data"
"github.com/multiversx/mx-chain-es-indexer-go/process/dataindexer"
)
Expand Down Expand Up @@ -268,3 +271,11 @@ func parseResponse(res *esapi.Response, dest interface{}, errorHandler responseE

return nil
}

// RetryBackOff returns elastic retry backoff duration
func RetryBackOff(attempt int) time.Duration {
d := time.Duration(math.Exp2(float64(attempt))) * time.Second
log.Debug("elastic: retry backoff", "attempt", attempt, "sleep duration", d)

return d
}
28 changes: 28 additions & 0 deletions client/mainChainElasticClient.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
package client

import (
"github.com/elastic/go-elasticsearch/v7"
)

type mainChainElasticClient struct {
*elasticClient
indexingEnabled bool
}

// NewMainChainElasticClient creates a new sovereign elastic client
func NewMainChainElasticClient(cfg elasticsearch.Config, indexingEnabled bool) (*mainChainElasticClient, error) {
esClient, err := NewElasticClient(cfg)
if err != nil {
return nil, err
}

return &mainChainElasticClient{
esClient,
indexingEnabled,
}, nil
}

// IsEnabled returns true if main chain elastic client is enabled
func (mcec *mainChainElasticClient) IsEnabled() bool {
return mcec.indexingEnabled
}
36 changes: 36 additions & 0 deletions client/mainChainElasticClient_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
package client

import (
"fmt"
"testing"

"github.com/elastic/go-elasticsearch/v7"
"github.com/stretchr/testify/require"

indexer "github.com/multiversx/mx-chain-es-indexer-go/process/dataindexer"
)

func TestNewMainChainElasticClient(t *testing.T) {
t.Run("no url, should error", func(t *testing.T) {
esClient, err := NewMainChainElasticClient(elasticsearch.Config{
Addresses: []string{},
}, true)
require.Nil(t, esClient)
require.Equal(t, indexer.ErrNoElasticUrlProvided, err)
})
t.Run("should work", func(t *testing.T) {
esClient, err := NewMainChainElasticClient(elasticsearch.Config{
Addresses: []string{"http://localhost:9200"},
}, true)
require.Nil(t, err)
require.Equal(t, "*client.mainChainElasticClient", fmt.Sprintf("%T", esClient))
})
}

func TestMainChainElasticClient_IsEnabled(t *testing.T) {
esClient, err := NewMainChainElasticClient(elasticsearch.Config{
Addresses: []string{"http://localhost:9200"},
}, true)
require.Nil(t, err)
require.Equal(t, true, esClient.IsEnabled())
}
6 changes: 6 additions & 0 deletions cmd/elasticindexer/config/prefs.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,3 +23,9 @@
username = ""
password = ""
bulk-request-max-size-in-bytes = 4194304 # 4MB

[config.main-chain-elastic-cluster]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe you can add a comment here to describe for what is used this section of config

enabled = true
url = "http://localhost:9201"
username = ""
password = ""
9 changes: 8 additions & 1 deletion config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,8 @@ type Config struct {
LogsPath string `toml:"logs-path"`
} `toml:"logs"`
} `toml:"config"`
Sovereign bool
Sovereign bool
ESDTPrefix string
}

// ClusterConfig will hold the config for the Elasticsearch cluster
Expand All @@ -52,6 +53,12 @@ type ClusterConfig struct {
Password string `toml:"password"`
BulkRequestMaxSizeInBytes int `toml:"bulk-request-max-size-in-bytes"`
} `toml:"elastic-cluster"`
MainChainCluster struct {
Enabled bool `toml:"enabled"`
URL string `toml:"url"`
UserName string `toml:"username"`
Password string `toml:"password"`
} `toml:"main-chain-elastic-cluster"`
} `toml:"config"`
}

Expand Down
12 changes: 12 additions & 0 deletions data/tokens.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,18 @@ type SourceToken struct {
CurrentOwner string `json:"currentOwner"`
}

// ResponseTokenInfo is the structure for the tokens info response
type ResponseTokenInfo struct {
Docs []ResponseTokenInfoDB `json:"docs"`
}

// ResponseTokenInfoDB is the structure for the token info response
type ResponseTokenInfoDB struct {
Found bool `json:"found"`
ID string `json:"_id"`
Source TokenInfo `json:"_source"`
mariusmihaic marked this conversation as resolved.
Show resolved Hide resolved
}

// TokenInfo is a structure that is needed to store information about a token
type TokenInfo struct {
Name string `json:"name,omitempty"`
Expand Down
16 changes: 16 additions & 0 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,22 @@ services:
ports:
- "9200:9200"
- "9300:9300"
main-chain-elasticsearch:
container_name: es-container2
image: docker.elastic.co/elasticsearch/elasticsearch:7.16.1
environment:
- "discovery.type=single-node"
- "xpack.security.enabled=false"
- "ES_JAVA_OPTS=-Xms512m -Xmx512m"
ulimits:
memlock:
soft: -1
hard: -1
networks:
- es-net
ports:
- "9201:9200"
- "9301:9300"
kibana:
container_name: kb-container
image: docker.elastic.co/kibana/kibana:7.16.1
Expand Down
4 changes: 3 additions & 1 deletion factory/runType/interface.go
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
package runType

import (
"github.com/multiversx/mx-chain-es-indexer-go/process/elasticproc"
"github.com/multiversx/mx-chain-es-indexer-go/process/elasticproc/transactions"
)

// RunTypeComponentsCreator is the interface for creating run type components
type RunTypeComponentsCreator interface {
Create() *runTypeComponents
Create() (*runTypeComponents, error)
IsInterfaceNil() bool
}

Expand All @@ -28,6 +29,7 @@ type RunTypeComponentsHandler interface {
type RunTypeComponentsHolder interface {
TxHashExtractorCreator() transactions.TxHashExtractor
RewardTxDataCreator() transactions.RewardTxDataHandler
IndexTokensHandlerCreator() elasticproc.IndexTokensHandler
Create() error
Close() error
CheckSubcomponents() error
Expand Down
6 changes: 4 additions & 2 deletions factory/runType/runTypeComponents.go
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
package runType

import (
"github.com/multiversx/mx-chain-es-indexer-go/process/elasticproc"
"github.com/multiversx/mx-chain-es-indexer-go/process/elasticproc/transactions"
)

type runTypeComponents struct {
txHashExtractor transactions.TxHashExtractor
rewardTxData transactions.RewardTxDataHandler
txHashExtractor transactions.TxHashExtractor
rewardTxData transactions.RewardTxDataHandler
indexTokensHandler elasticproc.IndexTokensHandler
}

// Close does nothing
Expand Down
10 changes: 6 additions & 4 deletions factory/runType/runTypeComponentsFactory.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package runType

import (
"github.com/multiversx/mx-chain-es-indexer-go/process/elasticproc/tokens"
"github.com/multiversx/mx-chain-es-indexer-go/process/elasticproc/transactions"
)

Expand All @@ -12,11 +13,12 @@ func NewRunTypeComponentsFactory() *runTypeComponentsFactory {
}

// Create will create the run type components
func (rtcf *runTypeComponentsFactory) Create() *runTypeComponents {
func (rtcf *runTypeComponentsFactory) Create() (*runTypeComponents, error) {
return &runTypeComponents{
txHashExtractor: transactions.NewTxHashExtractor(),
rewardTxData: transactions.NewRewardTxData(),
}
txHashExtractor: transactions.NewTxHashExtractor(),
rewardTxData: transactions.NewRewardTxData(),
indexTokensHandler: tokens.NewDisabledIndexTokensHandler(),
}, nil
}

// IsInterfaceNil returns true if there is no value under the interface
Expand Down
21 changes: 20 additions & 1 deletion factory/runType/runTypeComponentsHandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (

"github.com/multiversx/mx-chain-core-go/core/check"

"github.com/multiversx/mx-chain-es-indexer-go/process/elasticproc"
"github.com/multiversx/mx-chain-es-indexer-go/process/elasticproc/transactions"
)

Expand Down Expand Up @@ -34,7 +35,10 @@ func NewManagedRunTypeComponents(rtc RunTypeComponentsCreator) (*managedRunTypeC

// Create will create the managed components
func (mrtc *managedRunTypeComponents) Create() error {
rtc := mrtc.factory.Create()
rtc, err := mrtc.factory.Create()
if err != nil {
return err
}

mrtc.mutRunTypeCoreComponents.Lock()
mrtc.runTypeComponents = rtc
Expand Down Expand Up @@ -75,6 +79,9 @@ func (mrtc *managedRunTypeComponents) CheckSubcomponents() error {
if check.IfNil(mrtc.rewardTxData) {
return transactions.ErrNilRewardTxDataHandler
}
if check.IfNil(mrtc.indexTokensHandler) {
return transactions.ErrNilIndexTokensHandler
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe you should move transactions.ErrNilIndexTokensHandler in another package, because it is not relayed with the transactions package

}
return nil
}

Expand Down Expand Up @@ -102,6 +109,18 @@ func (mrtc *managedRunTypeComponents) RewardTxDataCreator() transactions.RewardT
return mrtc.runTypeComponents.rewardTxData
}

// IndexTokensHandlerCreator returns the index tokens handler
func (mrtc *managedRunTypeComponents) IndexTokensHandlerCreator() elasticproc.IndexTokensHandler {
mrtc.mutRunTypeCoreComponents.Lock()
defer mrtc.mutRunTypeCoreComponents.Unlock()

if check.IfNil(mrtc.runTypeComponents) {
return nil
}

return mrtc.runTypeComponents.indexTokensHandler
}

// IsInterfaceNil returns true if the interface is nil
func (mrtc *managedRunTypeComponents) IsInterfaceNil() bool {
return mrtc == nil
Expand Down
Loading