Skip to content

Commit

Permalink
Merge pull request #263 from multiversx/indexer-improvments-and-fixes
Browse files Browse the repository at this point in the history
Indexer improvements and fixes
  • Loading branch information
miiu96 authored Feb 26, 2024
2 parents 6a085b6 + c1559c5 commit a33494b
Show file tree
Hide file tree
Showing 18 changed files with 144 additions and 39 deletions.
2 changes: 1 addition & 1 deletion client/elasticClientCommon.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ func elasticBulkRequestResponseHandler(res *esapi.Response) error {
return fmt.Errorf("%s", res.String())
}

bodyBytes, err := ioutil.ReadAll(res.Body)
bodyBytes, err := io.ReadAll(res.Body)
if err != nil {
return fmt.Errorf("%w cannot read elastic response body bytes", err)
}
Expand Down
6 changes: 0 additions & 6 deletions cmd/elasticindexer/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,10 +47,4 @@ var (
Name: "disable-ansi-color",
Usage: "Boolean option for disabling ANSI colors in the logging system.",
}
importDB = cli.BoolFlag{
Name: "import-db",
Usage: "This flag, when enabled, triggers the indexer to operate in import database mode. In this mode," +
" the indexer excludes the indexing of cross shard transactions received from the source shard. " +
"This flag must be enabled when the observers are in import database mode.",
}
)
17 changes: 14 additions & 3 deletions cmd/elasticindexer/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,18 @@ VERSION:
`
)

// appVersion should be populated at build time using ldflags
// Usage examples:
// linux/mac:
//
// go build -v -ldflags="-X main.appVersion=$(git describe --tags --long --dirty)"
//
// windows:
//
// for /f %i in ('git describe --tags --long --dirty') do set VERS=%i
// go build -v -ldflags="-X main.version=%VERS%"
var version = "undefined"

func main() {
app := cli.NewApp()
cli.AppHelpTemplate = helpTemplate
Expand All @@ -51,7 +63,6 @@ func main() {
logLevel,
logSaveFile,
disableAnsiColor,
importDB,
}
app.Authors = []cli.Author{
{
Expand All @@ -60,6 +71,7 @@ func main() {
},
}

app.Version = version
app.Action = startIndexer

err := app.Run(os.Args)
Expand All @@ -85,9 +97,8 @@ func startIndexer(ctx *cli.Context) error {
return fmt.Errorf("%w while initializing the logger", err)
}

importDBMode := ctx.GlobalBool(importDB.Name)
statusMetrics := metrics.NewStatusMetrics()
wsHost, err := factory.CreateWsIndexer(cfg, clusterCfg, importDBMode, statusMetrics)
wsHost, err := factory.CreateWsIndexer(cfg, clusterCfg, statusMetrics, ctx.App.Version)
if err != nil {
return fmt.Errorf("%w while creating the indexer", err)
}
Expand Down
6 changes: 6 additions & 0 deletions data/data.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,3 +47,9 @@ type ResponseScroll struct {
} `json:"hits"`
} `json:"hits"`
}

// KeyValueObj is the dto for values index
type KeyValueObj struct {
Key string `json:"key"`
Value string `json:"value"`
}
8 changes: 4 additions & 4 deletions factory/wsIndexerFactory.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,13 @@ import (
var log = logger.GetOrCreate("elasticindexer")

// CreateWsIndexer will create a new instance of wsindexer.WSClient
func CreateWsIndexer(cfg config.Config, clusterCfg config.ClusterConfig, importDB bool, statusMetrics core.StatusMetricsHandler) (wsindexer.WSClient, error) {
func CreateWsIndexer(cfg config.Config, clusterCfg config.ClusterConfig, statusMetrics core.StatusMetricsHandler, version string) (wsindexer.WSClient, error) {
wsMarshaller, err := factoryMarshaller.NewMarshalizer(clusterCfg.Config.WebSocket.DataMarshallerType)
if err != nil {
return nil, err
}

dataIndexer, err := createDataIndexer(cfg, clusterCfg, wsMarshaller, importDB, statusMetrics)
dataIndexer, err := createDataIndexer(cfg, clusterCfg, wsMarshaller, statusMetrics, version)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -55,8 +55,8 @@ func createDataIndexer(
cfg config.Config,
clusterCfg config.ClusterConfig,
wsMarshaller marshal.Marshalizer,
importDB bool,
statusMetrics core.StatusMetricsHandler,
version string,
) (wsindexer.DataIndexer, error) {
marshaller, err := factoryMarshaller.NewMarshalizer(cfg.Config.Marshaller.Type)
if err != nil {
Expand Down Expand Up @@ -88,8 +88,8 @@ func createDataIndexer(
AddressPubkeyConverter: addressPubkeyConverter,
ValidatorPubkeyConverter: validatorPubkeyConverter,
HeaderMarshaller: wsMarshaller,
ImportDB: importDB,
StatusMetrics: statusMetrics,
Version: version,
})
}

Expand Down
14 changes: 7 additions & 7 deletions integrationtests/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,20 +25,20 @@ var (
pubKeyConverter, _ = pubkeyConverter.NewBech32PubkeyConverter(32, addressPrefix)
)

//nolint
// nolint
func setLogLevelDebug() {
_ = logger.SetLogLevel("process:DEBUG")
}

//nolint
// nolint
func createESClient(url string) (elasticproc.DatabaseClientHandler, error) {
return client.NewElasticClient(elasticsearch.Config{
Addresses: []string{url},
Logger: &logging.CustomLogger{},
})
}

//nolint
// nolint
func decodeAddress(address string) []byte {
decoded, err := pubKeyConverter.Decode(address)
log.LogIfError(err, "address", address)
Expand All @@ -58,22 +58,22 @@ func CreateElasticProcessor(
DBClient: esClient,
EnabledIndexes: []string{dataindexer.TransactionsIndex, dataindexer.LogsIndex, dataindexer.AccountsESDTIndex, dataindexer.ScResultsIndex,
dataindexer.ReceiptsIndex, dataindexer.BlockIndex, dataindexer.AccountsIndex, dataindexer.TokensIndex, dataindexer.TagsIndex,
dataindexer.OperationsIndex, dataindexer.DelegatorsIndex, dataindexer.ESDTsIndex, dataindexer.SCDeploysIndex, dataindexer.MiniblocksIndex},
dataindexer.OperationsIndex, dataindexer.DelegatorsIndex, dataindexer.ESDTsIndex, dataindexer.SCDeploysIndex, dataindexer.MiniblocksIndex, dataindexer.ValuesIndex},
Denomination: 18,
}

return factory.CreateElasticProcessor(args)
}

//nolint
// nolint
func readExpectedResult(path string) string {
jsonFile, _ := os.Open(path)
byteValue, _ := ioutil.ReadAll(jsonFile)

return string(byteValue)
}

//nolint
// nolint
func getElementFromSlice(path string, index int) string {
fileBytes := readExpectedResult(path)
slice := make([]map[string]interface{}, 0)
Expand All @@ -83,7 +83,7 @@ func getElementFromSlice(path string, index int) string {
return string(res)
}

//nolint
// nolint
func getIndexMappings(index string) (string, error) {
u, _ := url.Parse(esURL)
u.Path = path.Join(u.Path, index, "_mappings")
Expand Down
39 changes: 39 additions & 0 deletions integrationtests/valuesIndex_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
//go:build integrationtests

package integrationtests

import (
"context"
"fmt"
"testing"

"github.com/multiversx/mx-chain-es-indexer-go/mock"
indexerData "github.com/multiversx/mx-chain-es-indexer-go/process/dataindexer"
"github.com/multiversx/mx-chain-es-indexer-go/process/elasticproc/factory"
"github.com/stretchr/testify/require"
)

func TestCheckVersionIsIndexer(t *testing.T) {
esClient, err := createESClient(esURL)
require.Nil(t, err)

version := "v1.4.5"
args := factory.ArgElasticProcessorFactory{
Marshalizer: &mock.MarshalizerMock{},
Hasher: &mock.HasherMock{},
AddressPubkeyConverter: pubKeyConverter,
ValidatorPubkeyConverter: mock.NewPubkeyConverterMock(32),
DBClient: esClient,
Denomination: 18,
Version: version,
EnabledIndexes: []string{indexerData.ValuesIndex},
}

_, err = factory.CreateElasticProcessor(args)
require.Nil(t, err)

genericResponse := &GenericResponse{}
err = esClient.DoMultiGet(context.Background(), []string{"indexer-version"}, indexerData.ValuesIndex, true, genericResponse)
require.Nil(t, err)
require.Equal(t, fmt.Sprintf(`{"key":"indexer-version","value":"%s"}`, version), string(genericResponse.Docs[0].Source))
}
2 changes: 2 additions & 0 deletions process/dataindexer/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@ const (
OperationsIndex = "operations"
// ESDTsIndex is the Elasticsearch index for esdt tokens
ESDTsIndex = "esdts"
// ValuesIndex is the Elasticsearch index for extra indexer information
ValuesIndex = "values"

// TransactionsPolicy is the Elasticsearch policy for the transactions
TransactionsPolicy = "transactions_policy"
Expand Down
10 changes: 4 additions & 6 deletions process/elasticproc/accounts/serialize.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,12 +133,10 @@ func prepareSerializedAccountInfo(
if ('create' == ctx.op) {
ctx._source = params.account
} else {
if (ctx._source.containsKey('timestamp')) {
if (ctx._source.timestamp <= params.account.timestamp) {
ctx._source = params.account
}
} else {
ctx._source = params.account
if ((!ctx._source.containsKey('timestamp')) || (ctx._source.timestamp <= params.account.timestamp) ) {
params.account.forEach((key, value) -> {
ctx._source[key] = value;
});
}
}
`
Expand Down
8 changes: 4 additions & 4 deletions process/elasticproc/accounts/serialize_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ func TestSerializeAccounts(t *testing.T) {
require.Equal(t, 1, len(buffSlice.Buffers()))

expectedRes := `{ "update" : {"_index": "accounts", "_id" : "addr1" } }
{"scripted_upsert": true, "script": {"source": "if ('create' == ctx.op) {ctx._source = params.account} else {if (ctx._source.containsKey('timestamp')) {if (ctx._source.timestamp <= params.account.timestamp) {ctx._source = params.account}} else {ctx._source = params.account}}","lang": "painless","params": { "account": {"address":"addr1","nonce":1,"balance":"50","balanceNum":0.1,"shardID":0} }},"upsert": {}}
{"scripted_upsert": true, "script": {"source": "if ('create' == ctx.op) {ctx._source = params.account} else {if ((!ctx._source.containsKey('timestamp')) || (ctx._source.timestamp <= params.account.timestamp) ) {params.account.forEach((key, value) -> {ctx._source[key] = value;});}}","lang": "painless","params": { "account": {"address":"addr1","nonce":1,"balance":"50","balanceNum":0.1,"shardID":0} }},"upsert": {}}
`
require.Equal(t, expectedRes, buffSlice.Buffers()[0].String())
}
Expand All @@ -81,7 +81,7 @@ func TestSerializeAccountsESDTNonceZero(t *testing.T) {
require.Equal(t, 1, len(buffSlice.Buffers()))

expectedRes := `{ "update" : {"_index": "accountsesdt", "_id" : "addr1-token-abcd-00" } }
{"scripted_upsert": true, "script": {"source": "if ('create' == ctx.op) {ctx._source = params.account} else {if (ctx._source.containsKey('timestamp')) {if (ctx._source.timestamp <= params.account.timestamp) {ctx._source = params.account}} else {ctx._source = params.account}}","lang": "painless","params": { "account": {"address":"addr1","nonce":1,"balance":"10000000000000","balanceNum":1,"token":"token-abcd","properties":"000","timestamp":123,"shardID":0} }},"upsert": {}}
{"scripted_upsert": true, "script": {"source": "if ('create' == ctx.op) {ctx._source = params.account} else {if ((!ctx._source.containsKey('timestamp')) || (ctx._source.timestamp <= params.account.timestamp) ) {params.account.forEach((key, value) -> {ctx._source[key] = value;});}}","lang": "painless","params": { "account": {"address":"addr1","nonce":1,"balance":"10000000000000","balanceNum":1,"token":"token-abcd","properties":"000","timestamp":123,"shardID":0} }},"upsert": {}}
`
require.Equal(t, expectedRes, buffSlice.Buffers()[0].String())
}
Expand All @@ -107,7 +107,7 @@ func TestSerializeAccountsESDT(t *testing.T) {
require.Equal(t, 1, len(buffSlice.Buffers()))

expectedRes := `{ "update" : {"_index": "accountsesdt", "_id" : "addr1-token-0001-05" } }
{"scripted_upsert": true, "script": {"source": "if ('create' == ctx.op) {ctx._source = params.account} else {if (ctx._source.containsKey('timestamp')) {if (ctx._source.timestamp <= params.account.timestamp) {ctx._source = params.account}} else {ctx._source = params.account}}","lang": "painless","params": { "account": {"address":"addr1","nonce":1,"balance":"10000000000000","balanceNum":1,"token":"token-0001","tokenNonce":5,"properties":"000","shardID":0} }},"upsert": {}}
{"scripted_upsert": true, "script": {"source": "if ('create' == ctx.op) {ctx._source = params.account} else {if ((!ctx._source.containsKey('timestamp')) || (ctx._source.timestamp <= params.account.timestamp) ) {params.account.forEach((key, value) -> {ctx._source[key] = value;});}}","lang": "painless","params": { "account": {"address":"addr1","nonce":1,"balance":"10000000000000","balanceNum":1,"token":"token-0001","tokenNonce":5,"properties":"000","shardID":0} }},"upsert": {}}
`
require.Equal(t, expectedRes, buffSlice.Buffers()[0].String())
}
Expand Down Expand Up @@ -147,7 +147,7 @@ func TestSerializeAccountsNFTWithMedaData(t *testing.T) {
require.Equal(t, 1, len(buffSlice.Buffers()))

expectedRes := `{ "update" : {"_index": "accountsesdt", "_id" : "addr1-token-0001-16" } }
{"scripted_upsert": true, "script": {"source": "if ('create' == ctx.op) {ctx._source = params.account} else {if (ctx._source.containsKey('timestamp')) {if (ctx._source.timestamp <= params.account.timestamp) {ctx._source = params.account}} else {ctx._source = params.account}}","lang": "painless","params": { "account": {"address":"addr1","nonce":1,"balance":"10000000000000","balanceNum":1,"token":"token-0001","identifier":"token-0001-5","tokenNonce":22,"properties":"000","data":{"name":"nft","creator":"010101","royalties":1,"hash":"aGFzaA==","uris":["dXJp"],"tags":["test","free","fun"],"attributes":"dGFnczp0ZXN0LGZyZWUsZnVuO2Rlc2NyaXB0aW9uOlRoaXMgaXMgYSB0ZXN0IGRlc2NyaXB0aW9uIGZvciBhbiBhd2Vzb21lIG5mdA==","metadata":"metadata-test","nonEmptyURIs":true,"whiteListedStorage":false},"shardID":0} }},"upsert": {}}
{"scripted_upsert": true, "script": {"source": "if ('create' == ctx.op) {ctx._source = params.account} else {if ((!ctx._source.containsKey('timestamp')) || (ctx._source.timestamp <= params.account.timestamp) ) {params.account.forEach((key, value) -> {ctx._source[key] = value;});}}","lang": "painless","params": { "account": {"address":"addr1","nonce":1,"balance":"10000000000000","balanceNum":1,"token":"token-0001","identifier":"token-0001-5","tokenNonce":22,"properties":"000","data":{"name":"nft","creator":"010101","royalties":1,"hash":"aGFzaA==","uris":["dXJp"],"tags":["test","free","fun"],"attributes":"dGFnczp0ZXN0LGZyZWUsZnVuO2Rlc2NyaXB0aW9uOlRoaXMgaXMgYSB0ZXN0IGRlc2NyaXB0aW9uIGZvciBhbiBhd2Vzb21lIG5mdA==","metadata":"metadata-test","nonEmptyURIs":true,"whiteListedStorage":false},"shardID":0} }},"upsert": {}}
`
require.Equal(t, expectedRes, buffSlice.Buffers()[0].String())
}
Expand Down
36 changes: 33 additions & 3 deletions process/elasticproc/elasticProcessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"bytes"
"context"
"encoding/hex"
"encoding/json"
"fmt"
"sync"

Expand All @@ -29,11 +30,11 @@ var (
elasticIndexer.TransactionsIndex, elasticIndexer.BlockIndex, elasticIndexer.MiniblocksIndex, elasticIndexer.RatingIndex, elasticIndexer.RoundsIndex, elasticIndexer.ValidatorsIndex,
elasticIndexer.AccountsIndex, elasticIndexer.AccountsHistoryIndex, elasticIndexer.ReceiptsIndex, elasticIndexer.ScResultsIndex, elasticIndexer.AccountsESDTHistoryIndex, elasticIndexer.AccountsESDTIndex,
elasticIndexer.EpochInfoIndex, elasticIndexer.SCDeploysIndex, elasticIndexer.TokensIndex, elasticIndexer.TagsIndex, elasticIndexer.LogsIndex, elasticIndexer.DelegatorsIndex, elasticIndexer.OperationsIndex,
elasticIndexer.ESDTsIndex,
elasticIndexer.ESDTsIndex, elasticIndexer.ValuesIndex,
}
)

type objectsMap = map[string]interface{}
const versionStr = "indexer-version"

// ArgElasticProcessor holds all dependencies required by the elasticProcessor in order to create
// new instances
Expand All @@ -53,6 +54,7 @@ type ArgElasticProcessor struct {
DBClient DatabaseClientHandler
LogsAndEventsProc DBLogsAndEventsHandler
OperationsProc OperationsHandler
Version string
}

type elasticProcessor struct {
Expand Down Expand Up @@ -97,7 +99,9 @@ func NewElasticProcessor(arguments *ArgElasticProcessor) (*elasticProcessor, err
return nil, err
}

return ei, nil
err = ei.indexVersion(arguments.Version)

return ei, err
}

// TODO move all the index create part in a new component
Expand Down Expand Up @@ -134,6 +138,32 @@ func (ei *elasticProcessor) init(useKibana bool, indexTemplates, _ map[string]*b
return nil
}

func (ei *elasticProcessor) indexVersion(version string) error {
if version == "" {
log.Debug("ei.elasticProcessor indexer version is empty")
return nil
}

keyValueObj := &data.KeyValueObj{
Key: versionStr,
Value: version,
}

meta := []byte(fmt.Sprintf(`{ "index" : { "_index":"%s", "_id" : "%s" } }%s`, elasticIndexer.ValuesIndex, versionStr, "\n"))
keyValueObjBytes, err := json.Marshal(keyValueObj)
if err != nil {
return err
}

buffSlice := data.NewBufferSlice(0)
err = buffSlice.PutData(meta, keyValueObjBytes)
if err != nil {
return err
}

return ei.elasticClient.DoBulkRequest(context.Background(), buffSlice.Buffers()[0], "")
}

// nolint
func (ei *elasticProcessor) createIndexPolicies(indexPolicies map[string]*bytes.Buffer) error {
indexesPolicies := []string{elasticIndexer.TransactionsPolicy, elasticIndexer.BlockPolicy, elasticIndexer.MiniblocksPolicy, elasticIndexer.RatingPolicy, elasticIndexer.RoundsPolicy, elasticIndexer.ValidatorsPolicy,
Expand Down
2 changes: 2 additions & 0 deletions process/elasticproc/factory/elasticProcessorFactory.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ type ArgElasticProcessorFactory struct {
ValidatorPubkeyConverter core.PubkeyConverter
DBClient elasticproc.DatabaseClientHandler
EnabledIndexes []string
Version string
Denomination int
BulkRequestMaxSize int
UseKibana bool
Expand Down Expand Up @@ -120,6 +121,7 @@ func CreateElasticProcessor(arguments ArgElasticProcessorFactory) (dataindexer.E
IndexPolicies: indexPolicies,
OperationsProc: operationsProc,
ImportDB: arguments.ImportDB,
Version: arguments.Version,
}

return elasticproc.NewElasticProcessor(args)
Expand Down
1 change: 1 addition & 0 deletions process/elasticproc/templatesAndPolicies/noKibana.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ func (tr *templatesAndPolicyReaderNoKibana) GetElasticTemplatesAndPolicies() (ma
indexTemplates[indexer.DelegatorsIndex] = noKibana.Delegators.ToBuffer()
indexTemplates[indexer.OperationsIndex] = noKibana.Operations.ToBuffer()
indexTemplates[indexer.ESDTsIndex] = noKibana.ESDTs.ToBuffer()
indexTemplates[indexer.ValuesIndex] = noKibana.Values.ToBuffer()

return indexTemplates, indexPolicies, nil
}
2 changes: 1 addition & 1 deletion process/elasticproc/templatesAndPolicies/noKibana_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,5 +14,5 @@ func TestTemplatesAndPolicyReaderNoKibana_GetElasticTemplatesAndPolicies(t *test
templates, policies, err := reader.GetElasticTemplatesAndPolicies()
require.Nil(t, err)
require.Len(t, policies, 0)
require.Len(t, templates, 21)
require.Len(t, templates, 22)
}
2 changes: 2 additions & 0 deletions process/factory/indexerFactory.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ type ArgsIndexerFactory struct {
UserName string
Password string
TemplatesPath string
Version string
EnabledIndexes []string
HeaderMarshaller marshal.Marshalizer
Marshalizer marshal.Marshalizer
Expand Down Expand Up @@ -95,6 +96,7 @@ func createElasticProcessor(args ArgsIndexerFactory) (dataindexer.ElasticProcess
EnabledIndexes: args.EnabledIndexes,
BulkRequestMaxSize: args.BulkRequestMaxSize,
ImportDB: args.ImportDB,
Version: args.Version,
}

return factory.CreateElasticProcessor(argsElasticProcFac)
Expand Down
Loading

0 comments on commit a33494b

Please sign in to comment.