diff --git a/client/elasticClient.go b/client/elasticClient.go index 3f0a66ed..64f7e30f 100644 --- a/client/elasticClient.go +++ b/client/elasticClient.go @@ -153,8 +153,14 @@ func (ec *elasticClient) DoQueryRemove(ctx context.Context, index string, body * log.Warn("elasticClient.doRefresh", "cannot do refresh", err) } + writeIndex, err := ec.getWriteIndex(index) + if err != nil { + log.Warn("elasticClient.getWriteIndex", "cannot do get write index", err) + return err + } + res, err := ec.client.DeleteByQuery( - []string{index}, + []string{writeIndex}, body, ec.client.DeleteByQuery.WithIgnoreUnavailable(true), ec.client.DeleteByQuery.WithConflicts(esConflictsPolicy), @@ -323,6 +329,39 @@ func (ec *elasticClient) createAlias(alias string, index string) error { return parseResponse(res, nil, elasticDefaultErrorResponseHandler) } +func (ec *elasticClient) getWriteIndex(alias string) (string, error) { + res, err := ec.client.Indices.GetAlias( + ec.client.Indices.GetAlias.WithIndex(alias), + ) + if err != nil { + return "", err + } + + var indexData map[string]struct { + Aliases map[string]struct { + IsWriteIndex bool `json:"is_write_index"` + } `json:"aliases"` + } + err = parseResponse(res, &indexData, elasticDefaultErrorResponseHandler) + if err != nil { + return "", err + } + + for index, details := range indexData { + if len(indexData) == 1 { + return index, nil + } + + for _, indexAlias := range details.Aliases { + if indexAlias.IsWriteIndex { + return index, nil + } + } + } + + return alias, nil +} + // UpdateByQuery will update all the documents that match the provided query from the provided index func (ec *elasticClient) UpdateByQuery(ctx context.Context, index string, buff *bytes.Buffer) error { reader := bytes.NewReader(buff.Bytes()) diff --git a/client/elasticClientCommon.go b/client/elasticClientCommon.go index 00850f7d..41ed1034 100644 --- a/client/elasticClientCommon.go +++ b/client/elasticClientCommon.go @@ -96,7 +96,7 @@ func elasticBulkRequestResponseHandler(res *esapi.Response) error { return fmt.Errorf("%s", res.String()) } - bodyBytes, err := ioutil.ReadAll(res.Body) + bodyBytes, err := io.ReadAll(res.Body) if err != nil { return fmt.Errorf("%w cannot read elastic response body bytes", err) } diff --git a/client/elasticClient_test.go b/client/elasticClient_test.go index 96b552ec..c828bb0d 100644 --- a/client/elasticClient_test.go +++ b/client/elasticClient_test.go @@ -2,7 +2,7 @@ package client import ( "context" - "io/ioutil" + "io" "net/http" "net/http/httptest" "os" @@ -53,7 +53,7 @@ func TestElasticClient_DoMultiGet(t *testing.T) { jsonFile, err := os.Open("./testsData/response-multi-get.json") require.Nil(t, err) - byteValue, _ := ioutil.ReadAll(jsonFile) + byteValue, _ := io.ReadAll(jsonFile) _, _ = w.Write(byteValue) } @@ -75,3 +75,51 @@ func TestElasticClient_DoMultiGet(t *testing.T) { _, ok := resMap["docs"] require.True(t, ok) } + +func TestElasticClient_GetWriteIndexMultipleIndicesBehind(t *testing.T) { + handler := http.NotFound + ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + handler(w, r) + })) + defer ts.Close() + + handler = func(w http.ResponseWriter, r *http.Request) { + jsonFile, err := os.Open("./testsData/response-get-alias.json") + require.Nil(t, err) + + byteValue, _ := io.ReadAll(jsonFile) + _, _ = w.Write(byteValue) + } + + esClient, _ := NewElasticClient(elasticsearch.Config{ + Addresses: []string{ts.URL}, + Logger: &logging.CustomLogger{}, + }) + res, err := esClient.getWriteIndex("blocks") + require.Nil(t, err) + require.Equal(t, "blocks-000004", res) +} + +func TestElasticClient_GetWriteIndexOneIndex(t *testing.T) { + handler := http.NotFound + ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + handler(w, r) + })) + defer ts.Close() + + handler = func(w http.ResponseWriter, r *http.Request) { + jsonFile, err := os.Open("./testsData/response-get-alias-only-one-index.json") + require.Nil(t, err) + + byteValue, _ := io.ReadAll(jsonFile) + _, _ = w.Write(byteValue) + } + + esClient, _ := NewElasticClient(elasticsearch.Config{ + Addresses: []string{ts.URL}, + Logger: &logging.CustomLogger{}, + }) + res, err := esClient.getWriteIndex("delegators") + require.Nil(t, err) + require.Equal(t, "delegators-000001", res) +} diff --git a/client/testsData/response-get-alias-only-one-index.json b/client/testsData/response-get-alias-only-one-index.json new file mode 100644 index 00000000..a56b11b8 --- /dev/null +++ b/client/testsData/response-get-alias-only-one-index.json @@ -0,0 +1,7 @@ +{ + "delegators-000001" : { + "aliases" : { + "delegators" : { } + } + } +} diff --git a/client/testsData/response-get-alias.json b/client/testsData/response-get-alias.json new file mode 100644 index 00000000..2c29231e --- /dev/null +++ b/client/testsData/response-get-alias.json @@ -0,0 +1,30 @@ +{ + "blocks-000003": { + "aliases": { + "blocks": { + "is_write_index": false + } + } + }, + "blocks-000004": { + "aliases": { + "blocks": { + "is_write_index": true + } + } + }, + "blocks-000002": { + "aliases": { + "blocks": { + "is_write_index": false + } + } + }, + "blocks-000001": { + "aliases": { + "blocks": { + "is_write_index": false + } + } + } +} diff --git a/cmd/elasticindexer/config/config.toml b/cmd/elasticindexer/config/config.toml index 35fa0702..b63329c7 100644 --- a/cmd/elasticindexer/config/config.toml +++ b/cmd/elasticindexer/config/config.toml @@ -2,7 +2,7 @@ available-indices = [ "rating", "transactions", "blocks", "validators", "miniblocks", "rounds", "accounts", "accountshistory", "receipts", "scresults", "accountsesdt", "accountsesdthistory", "epochinfo", "scdeploys", "tokens", "tags", - "logs", "delegators", "operations", "esdts" + "logs", "delegators", "operations", "esdts", "values", "events" ] [config.address-converter] length = 32 diff --git a/cmd/elasticindexer/flags.go b/cmd/elasticindexer/flags.go index a8c6ed86..d7e84b70 100644 --- a/cmd/elasticindexer/flags.go +++ b/cmd/elasticindexer/flags.go @@ -47,10 +47,4 @@ var ( Name: "disable-ansi-color", Usage: "Boolean option for disabling ANSI colors in the logging system.", } - importDB = cli.BoolFlag{ - Name: "import-db", - Usage: "This flag, when enabled, triggers the indexer to operate in import database mode. In this mode," + - " the indexer excludes the indexing of cross shard transactions received from the source shard. " + - "This flag must be enabled when the observers are in import database mode.", - } ) diff --git a/cmd/elasticindexer/main.go b/cmd/elasticindexer/main.go index cea8460b..85246d13 100644 --- a/cmd/elasticindexer/main.go +++ b/cmd/elasticindexer/main.go @@ -39,6 +39,18 @@ VERSION: ` ) +// appVersion should be populated at build time using ldflags +// Usage examples: +// linux/mac: +// +// go build -v -ldflags="-X main.appVersion=$(git describe --tags --long --dirty)" +// +// windows: +// +// for /f %i in ('git describe --tags --long --dirty') do set VERS=%i +// go build -v -ldflags="-X main.version=%VERS%" +var version = "undefined" + func main() { app := cli.NewApp() cli.AppHelpTemplate = helpTemplate @@ -51,7 +63,6 @@ func main() { logLevel, logSaveFile, disableAnsiColor, - importDB, } app.Authors = []cli.Author{ { @@ -60,6 +71,7 @@ func main() { }, } + app.Version = version app.Action = startIndexer err := app.Run(os.Args) @@ -85,9 +97,8 @@ func startIndexer(ctx *cli.Context) error { return fmt.Errorf("%w while initializing the logger", err) } - importDBMode := ctx.GlobalBool(importDB.Name) statusMetrics := metrics.NewStatusMetrics() - wsHost, err := factory.CreateWsIndexer(cfg, clusterCfg, importDBMode, statusMetrics) + wsHost, err := factory.CreateWsIndexer(cfg, clusterCfg, statusMetrics, ctx.App.Version) if err != nil { return fmt.Errorf("%w while creating the indexer", err) } diff --git a/data/data.go b/data/data.go index e17042e6..2830db27 100644 --- a/data/data.go +++ b/data/data.go @@ -47,3 +47,9 @@ type ResponseScroll struct { } `json:"hits"` } `json:"hits"` } + +// KeyValueObj is the dto for values index +type KeyValueObj struct { + Key string `json:"key"` + Value string `json:"value"` +} diff --git a/data/event.go b/data/event.go new file mode 100644 index 00000000..f3e22668 --- /dev/null +++ b/data/event.go @@ -0,0 +1,20 @@ +package data + +import "time" + +// LogEvent is the dto for the log event structure +type LogEvent struct { + ID string `json:"-"` + TxHash string `json:"txHash"` + OriginalTxHash string `json:"originalTxHash,omitempty"` + LogAddress string `json:"logAddress"` + Address string `json:"address"` + Identifier string `json:"identifier"` + Data string `json:"data,omitempty"` + AdditionalData []string `json:"additionalData,omitempty"` + Topics []string `json:"topics"` + Order int `json:"order"` + TxOrder int `json:"txOrder"` + ShardID uint32 `json:"shardID"` + Timestamp time.Duration `json:"timestamp,omitempty"` +} diff --git a/data/logs.go b/data/logs.go index d27fd466..2daf0179 100644 --- a/data/logs.go +++ b/data/logs.go @@ -37,4 +37,6 @@ type PreparedLogsResults struct { TokensInfo []*TokenInfo NFTsDataUpdates []*NFTDataUpdate TokenRolesAndProperties *tokeninfo.TokenRolesAndProperties + DBLogs []*Logs + DBEvents []*LogEvent } diff --git a/data/scresult.go b/data/scresult.go index a2f595ce..b246528d 100644 --- a/data/scresult.go +++ b/data/scresult.go @@ -39,6 +39,7 @@ type ScResult struct { CanBeIgnored bool `json:"canBeIgnored,omitempty"` OriginalSender string `json:"originalSender,omitempty"` HasLogs bool `json:"hasLogs,omitempty"` + ExecutionOrder int `json:"-"` SenderAddressBytes []byte `json:"-"` InitialTxGasUsed uint64 `json:"-"` InitialTxFee string `json:"-"` diff --git a/data/transaction.go b/data/transaction.go index 04b7364f..d5cefd56 100644 --- a/data/transaction.go +++ b/data/transaction.go @@ -5,8 +5,8 @@ import ( ) // Transaction is a structure containing all the fields that need -// to be saved for a transaction. It has all the default fields -// plus some extra information for ease of search and filter +// to be saved for a transaction. It has all the default fields +// plus some extra information for ease of search and filter type Transaction struct { MBHash string `json:"miniBlockHash"` Nonce uint64 `json:"nonce"` @@ -48,6 +48,7 @@ type Transaction struct { GuardianSignature string `json:"guardianSignature,omitempty"` ErrorEvent bool `json:"errorEvent,omitempty"` CompletedEvent bool `json:"completedEvent,omitempty"` + ExecutionOrder int `json:"-"` SmartContractResults []*ScResult `json:"-"` Hash string `json:"-"` BlockHash string `json:"-"` diff --git a/docker-compose.yml b/docker-compose.yml index 7b03b074..970f0b40 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -1,4 +1,3 @@ -version: "3.0" services: elasticsearch: container_name: es-container diff --git a/factory/wsIndexerFactory.go b/factory/wsIndexerFactory.go index eaae1fdc..90beb5a9 100644 --- a/factory/wsIndexerFactory.go +++ b/factory/wsIndexerFactory.go @@ -17,13 +17,13 @@ import ( var log = logger.GetOrCreate("elasticindexer") // CreateWsIndexer will create a new instance of wsindexer.WSClient -func CreateWsIndexer(cfg config.Config, clusterCfg config.ClusterConfig, importDB bool, statusMetrics core.StatusMetricsHandler) (wsindexer.WSClient, error) { +func CreateWsIndexer(cfg config.Config, clusterCfg config.ClusterConfig, statusMetrics core.StatusMetricsHandler, version string) (wsindexer.WSClient, error) { wsMarshaller, err := factoryMarshaller.NewMarshalizer(clusterCfg.Config.WebSocket.DataMarshallerType) if err != nil { return nil, err } - dataIndexer, err := createDataIndexer(cfg, clusterCfg, wsMarshaller, importDB, statusMetrics) + dataIndexer, err := createDataIndexer(cfg, clusterCfg, wsMarshaller, statusMetrics, version) if err != nil { return nil, err } @@ -55,8 +55,8 @@ func createDataIndexer( cfg config.Config, clusterCfg config.ClusterConfig, wsMarshaller marshal.Marshalizer, - importDB bool, statusMetrics core.StatusMetricsHandler, + version string, ) (wsindexer.DataIndexer, error) { marshaller, err := factoryMarshaller.NewMarshalizer(cfg.Config.Marshaller.Type) if err != nil { @@ -88,8 +88,8 @@ func createDataIndexer( AddressPubkeyConverter: addressPubkeyConverter, ValidatorPubkeyConverter: validatorPubkeyConverter, HeaderMarshaller: wsMarshaller, - ImportDB: importDB, StatusMetrics: statusMetrics, + Version: version, }) } diff --git a/go.mod b/go.mod index ddaf2db4..8daf717f 100644 --- a/go.mod +++ b/go.mod @@ -6,10 +6,10 @@ require ( github.com/elastic/go-elasticsearch/v7 v7.12.0 github.com/gin-contrib/cors v1.4.0 github.com/gin-gonic/gin v1.9.1 - github.com/multiversx/mx-chain-communication-go v1.0.12 - github.com/multiversx/mx-chain-core-go v1.2.19-0.20240118100536-661f5af64039 - github.com/multiversx/mx-chain-logger-go v1.0.14-0.20231129101244-c44fa1c79b03 - github.com/multiversx/mx-chain-vm-common-go v1.5.10-0.20240118100602-3d0d315083e8 + github.com/multiversx/mx-chain-communication-go v1.0.15-0.20240508074652-e128a1c05c8e + github.com/multiversx/mx-chain-core-go v1.2.21-0.20240514102932-72b1cac83784 + github.com/multiversx/mx-chain-logger-go v1.0.15-0.20240508072523-3f00a726af57 + github.com/multiversx/mx-chain-vm-common-go v1.5.13-0.20240514104734-6dc768a4c5aa github.com/prometheus/client_model v0.4.0 github.com/prometheus/common v0.37.0 github.com/stretchr/testify v1.8.4 diff --git a/go.sum b/go.sum index 7d313d9e..74fbbfee 100644 --- a/go.sum +++ b/go.sum @@ -247,15 +247,15 @@ github.com/modern-go/reflect2 v1.0.2 h1:xBagoLtFs94CBntxluKeaWgTMpvLxC4ur3nMaC9G github.com/modern-go/reflect2 v1.0.2/go.mod h1:yWuevngMOJpCy52FWWMvUC8ws7m/LJsjYzDa0/r8luk= github.com/mr-tron/base58 v1.2.0 h1:T/HDJBh4ZCPbU39/+c3rRvE0uKBQlU27+QI8LJ4t64o= github.com/mr-tron/base58 v1.2.0/go.mod h1:BinMc/sQntlIE1frQmRFPUoPA1Zkr8VRgBdjWI2mNwc= -github.com/multiversx/mx-chain-communication-go v1.0.12 h1:67WOaf87gpwouydD1AAOHw5LMGZh7NfITrp/KqFY3Tw= -github.com/multiversx/mx-chain-communication-go v1.0.12/go.mod h1:+oaUowpq+SqrEmAsMPGwhz44g7L81loWb6AiNQU9Ms4= -github.com/multiversx/mx-chain-core-go v1.2.19-0.20240118100536-661f5af64039 h1:rusWamF1HWGAHsmdz/5TSfxHpe36HHvXCEKz3tic2yo= -github.com/multiversx/mx-chain-core-go v1.2.19-0.20240118100536-661f5af64039/go.mod h1:P/YBoFnt25XUaCQ7Q/SD15vhnc9yV5JDhHxyFO9P8Z0= -github.com/multiversx/mx-chain-crypto-go v1.2.8 h1:wOgVlUaO5X4L8iEbFjcQcL8SZvv6WZ7LqH73BiRPhxU= -github.com/multiversx/mx-chain-logger-go v1.0.14-0.20231129101244-c44fa1c79b03 h1:krjJTyN9jrFTK0goMGFdgvJGy6bYSqe8EtI/HCceUmU= -github.com/multiversx/mx-chain-logger-go v1.0.14-0.20231129101244-c44fa1c79b03/go.mod h1:fH/fR/GEBsDjPkBoZDVJMoYo2HhlA7++DP6QfITJ1N8= -github.com/multiversx/mx-chain-vm-common-go v1.5.10-0.20240118100602-3d0d315083e8 h1:0/k3n7Ak66oU1ygy8XR+4Q53DGmhS0VrMdKcZO433FI= -github.com/multiversx/mx-chain-vm-common-go v1.5.10-0.20240118100602-3d0d315083e8/go.mod h1:1ZUnRk7l/eTOyu2DOxy6zfEn1SAM/1u0nHUXE1Jw9xY= +github.com/multiversx/mx-chain-communication-go v1.0.15-0.20240508074652-e128a1c05c8e h1:Tsmwhu+UleE+l3buPuqXSKTqfu5FbPmzQ4MjMoUvCWA= +github.com/multiversx/mx-chain-communication-go v1.0.15-0.20240508074652-e128a1c05c8e/go.mod h1:2yXl18wUbuV3cRZr7VHxM1xo73kTaC1WUcu2kx8R034= +github.com/multiversx/mx-chain-core-go v1.2.21-0.20240514102932-72b1cac83784 h1:Dxm8MIIbEQW9hUIjfiVFm7mR+UbOSkb7xhMtXHP7dmk= +github.com/multiversx/mx-chain-core-go v1.2.21-0.20240514102932-72b1cac83784/go.mod h1:P/YBoFnt25XUaCQ7Q/SD15vhnc9yV5JDhHxyFO9P8Z0= +github.com/multiversx/mx-chain-crypto-go v1.2.12-0.20240508074452-cc21c1b505df h1:clihfi78bMEOWk/qw6WA4uQbCM2e2NGliqswLAvw19k= +github.com/multiversx/mx-chain-logger-go v1.0.15-0.20240508072523-3f00a726af57 h1:g9t410dqjcb7UUptbVd/H6Ua12sEzWU4v7VplyNvRZ0= +github.com/multiversx/mx-chain-logger-go v1.0.15-0.20240508072523-3f00a726af57/go.mod h1:cY6CIXpndW5g5PTPn4WzPwka/UBEf+mgw+PSY5pHGAU= +github.com/multiversx/mx-chain-vm-common-go v1.5.13-0.20240514104734-6dc768a4c5aa h1:CJyBXV0PHDE3/bjJU3quoNBjOmGjrn/EBqU7wBdcqhk= +github.com/multiversx/mx-chain-vm-common-go v1.5.13-0.20240514104734-6dc768a4c5aa/go.mod h1:/scqfBjA912V3Z2sPlkRVxWAE3puVNS2adQegm4LjCc= github.com/mwitkow/go-conntrack v0.0.0-20161129095857-cc309e4a2223/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U= github.com/mwitkow/go-conntrack v0.0.0-20190716064945-2f068394615f/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U= github.com/nxadm/tail v1.4.4/go.mod h1:kenIhsEOeOJmVchQTgglprH7qJGnHDVpk1VPCcaMI8A= diff --git a/integrationtests/accountsBalanceNftTransfer_test.go b/integrationtests/accountsBalanceNftTransfer_test.go index 7dbd2023..aaf44e00 100644 --- a/integrationtests/accountsBalanceNftTransfer_test.go +++ b/integrationtests/accountsBalanceNftTransfer_test.go @@ -34,6 +34,7 @@ func createOutportBlockWithHeader( TransactionPool: pool, AlteredAccounts: coreAlteredAccounts, NumberOfShards: numOfShards, + ShardID: header.GetShardID(), }, Header: header, } diff --git a/integrationtests/logsCrossShard_test.go b/integrationtests/logsCrossShard_test.go index aa62f94a..7c96688f 100644 --- a/integrationtests/logsCrossShard_test.go +++ b/integrationtests/logsCrossShard_test.go @@ -30,13 +30,21 @@ func TestIndexLogSourceShardAndAfterDestinationAndAgainSource(t *testing.T) { Round: 50, TimeStamp: 5040, } - body := &dataBlock.Body{} + + txHash := []byte("cross-log") + logID := hex.EncodeToString(txHash) + + body := &dataBlock.Body{ + MiniBlocks: []*dataBlock.MiniBlock{ + { + TxHashes: [][]byte{txHash}, + }, + }, + } address1 := "erd1ju8pkvg57cwdmjsjx58jlmnuf4l9yspstrhr9tgsrt98n9edpm2qtlgy99" address2 := "erd1w7jyzuj6cv4ngw8luhlkakatjpmjh3ql95lmxphd3vssc4vpymks6k5th7" - logID := hex.EncodeToString([]byte("cross-log")) - // index on source pool := &outport.TransactionPool{ Logs: []*outport.LogData{ @@ -55,6 +63,12 @@ func TestIndexLogSourceShardAndAfterDestinationAndAgainSource(t *testing.T) { }, }, }, + Transactions: map[string]*outport.TxInfo{ + logID: { + Transaction: &transaction.Transaction{}, + ExecutionOrder: 0, + }, + }, } err = esProc.SaveTransactions(createOutportBlockWithHeader(body, header, pool, map[string]*alteredAccount.AlteredAccount{}, testNumOfShards)) require.Nil(t, err) @@ -68,10 +82,20 @@ func TestIndexLogSourceShardAndAfterDestinationAndAgainSource(t *testing.T) { string(genericResponse.Docs[0].Source), ) + event1ID := logID + "-0-0" + ids = []string{event1ID} + err = esClient.DoMultiGet(context.Background(), ids, indexerdata.EventsIndex, true, genericResponse) + require.Nil(t, err) + require.JSONEq(t, + readExpectedResult("./testdata/logsCrossShard/event-transfer-source-first.json"), + string(genericResponse.Docs[0].Source), + ) + // INDEX ON DESTINATION header = &dataBlock.Header{ Round: 50, TimeStamp: 6040, + ShardID: 1, } pool = &outport.TransactionPool{ Logs: []*outport.LogData{ @@ -96,10 +120,17 @@ func TestIndexLogSourceShardAndAfterDestinationAndAgainSource(t *testing.T) { }, }, }, + Transactions: map[string]*outport.TxInfo{ + logID: { + Transaction: &transaction.Transaction{}, + ExecutionOrder: 0, + }, + }, } err = esProc.SaveTransactions(createOutportBlockWithHeader(body, header, pool, map[string]*alteredAccount.AlteredAccount{}, testNumOfShards)) require.Nil(t, err) + ids = []string{logID} err = esClient.DoMultiGet(context.Background(), ids, indexerdata.LogsIndex, true, genericResponse) require.Nil(t, err) require.JSONEq(t, @@ -107,6 +138,19 @@ func TestIndexLogSourceShardAndAfterDestinationAndAgainSource(t *testing.T) { string(genericResponse.Docs[0].Source), ) + event2ID, event3ID := logID+"-1-0", logID+"-1-1" + ids = []string{event2ID, event3ID} + err = esClient.DoMultiGet(context.Background(), ids, indexerdata.EventsIndex, true, genericResponse) + require.Nil(t, err) + require.JSONEq(t, + readExpectedResult("./testdata/logsCrossShard/event-transfer-destination.json"), + string(genericResponse.Docs[0].Source), + ) + require.JSONEq(t, + readExpectedResult("./testdata/logsCrossShard/event-do-something.json"), + string(genericResponse.Docs[1].Source), + ) + // index on source again should not change the log header = &dataBlock.Header{ Round: 50, @@ -129,10 +173,17 @@ func TestIndexLogSourceShardAndAfterDestinationAndAgainSource(t *testing.T) { }, }, }, + Transactions: map[string]*outport.TxInfo{ + logID: { + Transaction: &transaction.Transaction{}, + ExecutionOrder: 0, + }, + }, } err = esProc.SaveTransactions(createOutportBlockWithHeader(body, header, pool, map[string]*alteredAccount.AlteredAccount{}, testNumOfShards)) require.Nil(t, err) + ids = []string{logID} err = esClient.DoMultiGet(context.Background(), ids, indexerdata.LogsIndex, true, genericResponse) require.Nil(t, err) require.JSONEq(t, @@ -147,6 +198,7 @@ func TestIndexLogSourceShardAndAfterDestinationAndAgainSource(t *testing.T) { MiniBlockHeaders: []dataBlock.MiniBlockHeader{ {}, }, + ShardID: 1, } body = &dataBlock.Body{ MiniBlocks: []*dataBlock.MiniBlock{ @@ -163,4 +215,11 @@ func TestIndexLogSourceShardAndAfterDestinationAndAgainSource(t *testing.T) { require.Nil(t, err) require.False(t, genericResponse.Docs[0].Found) + + ids = []string{event2ID, event3ID} + err = esClient.DoMultiGet(context.Background(), ids, indexerdata.EventsIndex, true, genericResponse) + require.Nil(t, err) + + require.False(t, genericResponse.Docs[0].Found) + require.False(t, genericResponse.Docs[1].Found) } diff --git a/integrationtests/testdata/logsCrossShard/event-do-something.json b/integrationtests/testdata/logsCrossShard/event-do-something.json new file mode 100644 index 00000000..2490eea4 --- /dev/null +++ b/integrationtests/testdata/logsCrossShard/event-do-something.json @@ -0,0 +1,14 @@ +{ + "logAddress": "erd1ju8pkvg57cwdmjsjx58jlmnuf4l9yspstrhr9tgsrt98n9edpm2qtlgy99", + "identifier": "do-something", + "address": "erd1w7jyzuj6cv4ngw8luhlkakatjpmjh3ql95lmxphd3vssc4vpymks6k5th7", + "topics": [ + "746f70696331", + "746f70696332" + ], + "shardID": 1, + "txHash": "63726f73732d6c6f67", + "order": 1, + "timestamp": 6040, + "txOrder": 0 +} diff --git a/integrationtests/testdata/logsCrossShard/event-transfer-destination.json b/integrationtests/testdata/logsCrossShard/event-transfer-destination.json new file mode 100644 index 00000000..c373f9be --- /dev/null +++ b/integrationtests/testdata/logsCrossShard/event-transfer-destination.json @@ -0,0 +1,15 @@ +{ + "logAddress": "erd1ju8pkvg57cwdmjsjx58jlmnuf4l9yspstrhr9tgsrt98n9edpm2qtlgy99", + "identifier": "ESDTTransfer", + "address": "erd1ju8pkvg57cwdmjsjx58jlmnuf4l9yspstrhr9tgsrt98n9edpm2qtlgy99", + "topics": [ + "455344542d61626364", + "", + "01" + ], + "shardID": 1, + "txHash": "63726f73732d6c6f67", + "order": 0, + "timestamp": 6040, + "txOrder": 0 +} diff --git a/integrationtests/testdata/logsCrossShard/event-transfer-source-first.json b/integrationtests/testdata/logsCrossShard/event-transfer-source-first.json new file mode 100644 index 00000000..d085f338 --- /dev/null +++ b/integrationtests/testdata/logsCrossShard/event-transfer-source-first.json @@ -0,0 +1,15 @@ +{ + "logAddress": "erd1ju8pkvg57cwdmjsjx58jlmnuf4l9yspstrhr9tgsrt98n9edpm2qtlgy99", + "identifier": "ESDTTransfer", + "address": "erd1ju8pkvg57cwdmjsjx58jlmnuf4l9yspstrhr9tgsrt98n9edpm2qtlgy99", + "topics": [ + "455344542d61626364", + "", + "01" + ], + "shardID": 0, + "txOrder": 0, + "txHash": "63726f73732d6c6f67", + "order": 0, + "timestamp": 5040 +} diff --git a/integrationtests/utils.go b/integrationtests/utils.go index 6c603cdc..27f07c23 100644 --- a/integrationtests/utils.go +++ b/integrationtests/utils.go @@ -25,12 +25,12 @@ var ( pubKeyConverter, _ = pubkeyConverter.NewBech32PubkeyConverter(32, addressPrefix) ) -//nolint +// nolint func setLogLevelDebug() { _ = logger.SetLogLevel("process:DEBUG") } -//nolint +// nolint func createESClient(url string) (elasticproc.DatabaseClientHandler, error) { return client.NewElasticClient(elasticsearch.Config{ Addresses: []string{url}, @@ -38,7 +38,7 @@ func createESClient(url string) (elasticproc.DatabaseClientHandler, error) { }) } -//nolint +// nolint func decodeAddress(address string) []byte { decoded, err := pubKeyConverter.Decode(address) log.LogIfError(err, "address", address) @@ -57,15 +57,15 @@ func CreateElasticProcessor( ValidatorPubkeyConverter: mock.NewPubkeyConverterMock(32), DBClient: esClient, EnabledIndexes: []string{dataindexer.TransactionsIndex, dataindexer.LogsIndex, dataindexer.AccountsESDTIndex, dataindexer.ScResultsIndex, - dataindexer.ReceiptsIndex, dataindexer.BlockIndex, dataindexer.AccountsIndex, dataindexer.TokensIndex, dataindexer.TagsIndex, - dataindexer.OperationsIndex, dataindexer.DelegatorsIndex, dataindexer.ESDTsIndex, dataindexer.SCDeploysIndex, dataindexer.MiniblocksIndex}, + dataindexer.ReceiptsIndex, dataindexer.BlockIndex, dataindexer.AccountsIndex, dataindexer.TokensIndex, dataindexer.TagsIndex, dataindexer.EventsIndex, + dataindexer.OperationsIndex, dataindexer.DelegatorsIndex, dataindexer.ESDTsIndex, dataindexer.SCDeploysIndex, dataindexer.MiniblocksIndex, dataindexer.ValuesIndex}, Denomination: 18, } return factory.CreateElasticProcessor(args) } -//nolint +// nolint func readExpectedResult(path string) string { jsonFile, _ := os.Open(path) byteValue, _ := ioutil.ReadAll(jsonFile) @@ -73,7 +73,7 @@ func readExpectedResult(path string) string { return string(byteValue) } -//nolint +// nolint func getElementFromSlice(path string, index int) string { fileBytes := readExpectedResult(path) slice := make([]map[string]interface{}, 0) @@ -83,7 +83,7 @@ func getElementFromSlice(path string, index int) string { return string(res) } -//nolint +// nolint func getIndexMappings(index string) (string, error) { u, _ := url.Parse(esURL) u.Path = path.Join(u.Path, index, "_mappings") diff --git a/integrationtests/valuesIndex_test.go b/integrationtests/valuesIndex_test.go new file mode 100644 index 00000000..f76835f7 --- /dev/null +++ b/integrationtests/valuesIndex_test.go @@ -0,0 +1,39 @@ +//go:build integrationtests + +package integrationtests + +import ( + "context" + "fmt" + "testing" + + "github.com/multiversx/mx-chain-es-indexer-go/mock" + indexerData "github.com/multiversx/mx-chain-es-indexer-go/process/dataindexer" + "github.com/multiversx/mx-chain-es-indexer-go/process/elasticproc/factory" + "github.com/stretchr/testify/require" +) + +func TestCheckVersionIsIndexer(t *testing.T) { + esClient, err := createESClient(esURL) + require.Nil(t, err) + + version := "v1.4.5" + args := factory.ArgElasticProcessorFactory{ + Marshalizer: &mock.MarshalizerMock{}, + Hasher: &mock.HasherMock{}, + AddressPubkeyConverter: pubKeyConverter, + ValidatorPubkeyConverter: mock.NewPubkeyConverterMock(32), + DBClient: esClient, + Denomination: 18, + Version: version, + EnabledIndexes: []string{indexerData.ValuesIndex}, + } + + _, err = factory.CreateElasticProcessor(args) + require.Nil(t, err) + + genericResponse := &GenericResponse{} + err = esClient.DoMultiGet(context.Background(), []string{"indexer-version"}, indexerData.ValuesIndex, true, genericResponse) + require.Nil(t, err) + require.Equal(t, fmt.Sprintf(`{"key":"indexer-version","value":"%s"}`, version), string(genericResponse.Docs[0].Source)) +} diff --git a/process/dataindexer/constants.go b/process/dataindexer/constants.go index 0ec25998..229fc5d8 100644 --- a/process/dataindexer/constants.go +++ b/process/dataindexer/constants.go @@ -45,6 +45,10 @@ const ( OperationsIndex = "operations" // ESDTsIndex is the Elasticsearch index for esdt tokens ESDTsIndex = "esdts" + // ValuesIndex is the Elasticsearch index for extra indexer information + ValuesIndex = "values" + // EventsIndex is the Elasticsearch index for log events + EventsIndex = "events" // TransactionsPolicy is the Elasticsearch policy for the transactions TransactionsPolicy = "transactions_policy" diff --git a/process/elasticproc/accounts/serialize.go b/process/elasticproc/accounts/serialize.go index 9eab9723..525f0e69 100644 --- a/process/elasticproc/accounts/serialize.go +++ b/process/elasticproc/accounts/serialize.go @@ -133,12 +133,10 @@ func prepareSerializedAccountInfo( if ('create' == ctx.op) { ctx._source = params.account } else { - if (ctx._source.containsKey('timestamp')) { - if (ctx._source.timestamp <= params.account.timestamp) { - ctx._source = params.account - } - } else { - ctx._source = params.account + if ((!ctx._source.containsKey('timestamp')) || (ctx._source.timestamp <= params.account.timestamp) ) { + params.account.forEach((key, value) -> { + ctx._source[key] = value; + }); } } ` diff --git a/process/elasticproc/accounts/serialize_test.go b/process/elasticproc/accounts/serialize_test.go index 52b0dbf4..341c6241 100644 --- a/process/elasticproc/accounts/serialize_test.go +++ b/process/elasticproc/accounts/serialize_test.go @@ -54,7 +54,7 @@ func TestSerializeAccounts(t *testing.T) { require.Equal(t, 1, len(buffSlice.Buffers())) expectedRes := `{ "update" : {"_index": "accounts", "_id" : "addr1" } } -{"scripted_upsert": true, "script": {"source": "if ('create' == ctx.op) {ctx._source = params.account} else {if (ctx._source.containsKey('timestamp')) {if (ctx._source.timestamp <= params.account.timestamp) {ctx._source = params.account}} else {ctx._source = params.account}}","lang": "painless","params": { "account": {"address":"addr1","nonce":1,"balance":"50","balanceNum":0.1,"shardID":0} }},"upsert": {}} +{"scripted_upsert": true, "script": {"source": "if ('create' == ctx.op) {ctx._source = params.account} else {if ((!ctx._source.containsKey('timestamp')) || (ctx._source.timestamp <= params.account.timestamp) ) {params.account.forEach((key, value) -> {ctx._source[key] = value;});}}","lang": "painless","params": { "account": {"address":"addr1","nonce":1,"balance":"50","balanceNum":0.1,"shardID":0} }},"upsert": {}} ` require.Equal(t, expectedRes, buffSlice.Buffers()[0].String()) } @@ -81,7 +81,7 @@ func TestSerializeAccountsESDTNonceZero(t *testing.T) { require.Equal(t, 1, len(buffSlice.Buffers())) expectedRes := `{ "update" : {"_index": "accountsesdt", "_id" : "addr1-token-abcd-00" } } -{"scripted_upsert": true, "script": {"source": "if ('create' == ctx.op) {ctx._source = params.account} else {if (ctx._source.containsKey('timestamp')) {if (ctx._source.timestamp <= params.account.timestamp) {ctx._source = params.account}} else {ctx._source = params.account}}","lang": "painless","params": { "account": {"address":"addr1","nonce":1,"balance":"10000000000000","balanceNum":1,"token":"token-abcd","properties":"000","timestamp":123,"shardID":0} }},"upsert": {}} +{"scripted_upsert": true, "script": {"source": "if ('create' == ctx.op) {ctx._source = params.account} else {if ((!ctx._source.containsKey('timestamp')) || (ctx._source.timestamp <= params.account.timestamp) ) {params.account.forEach((key, value) -> {ctx._source[key] = value;});}}","lang": "painless","params": { "account": {"address":"addr1","nonce":1,"balance":"10000000000000","balanceNum":1,"token":"token-abcd","properties":"000","timestamp":123,"shardID":0} }},"upsert": {}} ` require.Equal(t, expectedRes, buffSlice.Buffers()[0].String()) } @@ -107,7 +107,7 @@ func TestSerializeAccountsESDT(t *testing.T) { require.Equal(t, 1, len(buffSlice.Buffers())) expectedRes := `{ "update" : {"_index": "accountsesdt", "_id" : "addr1-token-0001-05" } } -{"scripted_upsert": true, "script": {"source": "if ('create' == ctx.op) {ctx._source = params.account} else {if (ctx._source.containsKey('timestamp')) {if (ctx._source.timestamp <= params.account.timestamp) {ctx._source = params.account}} else {ctx._source = params.account}}","lang": "painless","params": { "account": {"address":"addr1","nonce":1,"balance":"10000000000000","balanceNum":1,"token":"token-0001","tokenNonce":5,"properties":"000","shardID":0} }},"upsert": {}} +{"scripted_upsert": true, "script": {"source": "if ('create' == ctx.op) {ctx._source = params.account} else {if ((!ctx._source.containsKey('timestamp')) || (ctx._source.timestamp <= params.account.timestamp) ) {params.account.forEach((key, value) -> {ctx._source[key] = value;});}}","lang": "painless","params": { "account": {"address":"addr1","nonce":1,"balance":"10000000000000","balanceNum":1,"token":"token-0001","tokenNonce":5,"properties":"000","shardID":0} }},"upsert": {}} ` require.Equal(t, expectedRes, buffSlice.Buffers()[0].String()) } @@ -147,7 +147,7 @@ func TestSerializeAccountsNFTWithMedaData(t *testing.T) { require.Equal(t, 1, len(buffSlice.Buffers())) expectedRes := `{ "update" : {"_index": "accountsesdt", "_id" : "addr1-token-0001-16" } } -{"scripted_upsert": true, "script": {"source": "if ('create' == ctx.op) {ctx._source = params.account} else {if (ctx._source.containsKey('timestamp')) {if (ctx._source.timestamp <= params.account.timestamp) {ctx._source = params.account}} else {ctx._source = params.account}}","lang": "painless","params": { "account": {"address":"addr1","nonce":1,"balance":"10000000000000","balanceNum":1,"token":"token-0001","identifier":"token-0001-5","tokenNonce":22,"properties":"000","data":{"name":"nft","creator":"010101","royalties":1,"hash":"aGFzaA==","uris":["dXJp"],"tags":["test","free","fun"],"attributes":"dGFnczp0ZXN0LGZyZWUsZnVuO2Rlc2NyaXB0aW9uOlRoaXMgaXMgYSB0ZXN0IGRlc2NyaXB0aW9uIGZvciBhbiBhd2Vzb21lIG5mdA==","metadata":"metadata-test","nonEmptyURIs":true,"whiteListedStorage":false},"shardID":0} }},"upsert": {}} +{"scripted_upsert": true, "script": {"source": "if ('create' == ctx.op) {ctx._source = params.account} else {if ((!ctx._source.containsKey('timestamp')) || (ctx._source.timestamp <= params.account.timestamp) ) {params.account.forEach((key, value) -> {ctx._source[key] = value;});}}","lang": "painless","params": { "account": {"address":"addr1","nonce":1,"balance":"10000000000000","balanceNum":1,"token":"token-0001","identifier":"token-0001-5","tokenNonce":22,"properties":"000","data":{"name":"nft","creator":"010101","royalties":1,"hash":"aGFzaA==","uris":["dXJp"],"tags":["test","free","fun"],"attributes":"dGFnczp0ZXN0LGZyZWUsZnVuO2Rlc2NyaXB0aW9uOlRoaXMgaXMgYSB0ZXN0IGRlc2NyaXB0aW9uIGZvciBhbiBhd2Vzb21lIG5mdA==","metadata":"metadata-test","nonEmptyURIs":true,"whiteListedStorage":false},"shardID":0} }},"upsert": {}} ` require.Equal(t, expectedRes, buffSlice.Buffers()[0].String()) } diff --git a/process/elasticproc/block/blockProcessor.go b/process/elasticproc/block/blockProcessor.go index c23e3e1b..a182f358 100644 --- a/process/elasticproc/block/blockProcessor.go +++ b/process/elasticproc/block/blockProcessor.go @@ -73,7 +73,7 @@ func (bp *blockProcessor) PrepareBlockForDB(obh *outport.OutportBlockWithHeader) } sizeTxs := computeSizeOfTransactions(obh.TransactionPool) - miniblocksHashes := bp.getEncodedMBSHashes(obh.BlockData.Body) + miniblocksHashes := bp.getEncodedMBSHashes(obh.BlockData.Body, obh.BlockData.IntraShardMiniBlocks) leaderIndex := bp.getLeaderIndex(obh.SignersIndexes) numTxs, notarizedTxs := getTxsCount(obh.Header) @@ -126,7 +126,9 @@ func (bp *blockProcessor) PrepareBlockForDB(obh *outport.OutportBlockWithHeader) } bp.addEpochStartInfoForMeta(obh.Header, elasticBlock) - putMiniblocksDetailsInBlock(obh.Header, elasticBlock, obh.TransactionPool, obh.BlockData.Body) + + appendBlockDetailsFromHeaders(elasticBlock, obh.Header, obh.BlockData.Body, obh.TransactionPool) + appendBlockDetailsFromIntraShardMbs(elasticBlock, obh.BlockData.IntraShardMiniBlocks, obh.TransactionPool, len(obh.Header.GetMiniBlockHeaderHandlers())) return elasticBlock, nil } @@ -227,9 +229,10 @@ func (bp *blockProcessor) addEpochStartShardDataForMeta(epochStartShardData node block.EpochStartShardsData = append(block.EpochStartShardsData, shardData) } -func (bp *blockProcessor) getEncodedMBSHashes(body *block.Body) []string { +func (bp *blockProcessor) getEncodedMBSHashes(body *block.Body, intraShardMbs []*nodeBlock.MiniBlock) []string { miniblocksHashes := make([]string, 0) - for _, miniblock := range body.MiniBlocks { + mbs := append(body.MiniBlocks, intraShardMbs...) + for _, miniblock := range mbs { mbHash, errComputeHash := core.CalculateHash(bp.marshalizer, bp.hasher, miniblock) if errComputeHash != nil { log.Warn("internal error computing hash", "error", errComputeHash) @@ -244,10 +247,8 @@ func (bp *blockProcessor) getEncodedMBSHashes(body *block.Body) []string { return miniblocksHashes } -func putMiniblocksDetailsInBlock(header coreData.HeaderHandler, block *data.Block, pool *outport.TransactionPool, body *block.Body) { - mbHeaders := header.GetMiniBlockHeaderHandlers() - - for idx, mbHeader := range mbHeaders { +func appendBlockDetailsFromHeaders(block *data.Block, header coreData.HeaderHandler, body *block.Body, pool *outport.TransactionPool) { + for idx, mbHeader := range header.GetMiniBlockHeaderHandlers() { mbType := nodeBlock.Type(mbHeader.GetTypeInt32()) if mbType == nodeBlock.PeerBlock { continue @@ -268,6 +269,42 @@ func putMiniblocksDetailsInBlock(header coreData.HeaderHandler, block *data.Bloc } } +func appendBlockDetailsFromIntraShardMbs(block *data.Block, intraShardMbs []*block.MiniBlock, pool *outport.TransactionPool, offset int) { + for idx, intraMB := range intraShardMbs { + if intraMB.Type == nodeBlock.PeerBlock || intraMB.Type == nodeBlock.ReceiptBlock { + continue + } + + block.MiniBlocksDetails = append(block.MiniBlocksDetails, &data.MiniBlocksDetails{ + IndexFirstProcessedTx: 0, + IndexLastProcessedTx: int32(len(intraMB.GetTxHashes()) - 1), + SenderShardID: intraMB.GetSenderShardID(), + ReceiverShardID: intraMB.GetReceiverShardID(), + MBIndex: idx + offset, + Type: intraMB.Type.String(), + ProcessingType: nodeBlock.Normal.String(), + TxsHashes: hexEncodeSlice(intraMB.TxHashes), + ExecutionOrderTxsIndices: extractExecutionOrderIntraShardMBUnsigned(intraMB, pool), + }) + } +} + +func extractExecutionOrderIntraShardMBUnsigned(mb *block.MiniBlock, pool *outport.TransactionPool) []int { + executionOrderTxsIndices := make([]int, len(mb.TxHashes)) + for idx, txHash := range mb.TxHashes { + executionOrder, found := getExecutionOrderForTx(txHash, int32(mb.Type), pool) + if !found { + log.Warn("blockProcessor.extractExecutionOrderIntraShardMBUnsigned cannot find tx in pool", "txHash", hex.EncodeToString(txHash)) + executionOrderTxsIndices[idx] = notFound + continue + } + + executionOrderTxsIndices[idx] = int(executionOrder) + } + + return executionOrderTxsIndices +} + func extractExecutionOrderIndicesFromPool(mbHeader coreData.MiniBlockHeaderHandler, txsHashes [][]byte, pool *outport.TransactionPool) []int { mbType := mbHeader.GetTypeInt32() executionOrderTxsIndices := make([]int, len(txsHashes)) diff --git a/process/elasticproc/block/blockProcessor_test.go b/process/elasticproc/block/blockProcessor_test.go index d727fc1f..b0208c23 100644 --- a/process/elasticproc/block/blockProcessor_test.go +++ b/process/elasticproc/block/blockProcessor_test.go @@ -368,7 +368,7 @@ func TestBlockProcessor_PrepareBlockForDBMiniBlocksDetails(t *testing.T) { } mbhrBytes, _ := gogoMarshaller.Marshal(mbhr) - txHash, notExecutedTxHash, notFoundTxHash, invalidTxHash, rewardsTxHash, scrHash := "tx", "notExecuted", "notFound", "invalid", "reward", "scr" + txHash, notExecutedTxHash, notFoundTxHash, invalidTxHash, rewardsTxHash, scrHash, intraSCR := "tx", "notExecuted", "notFound", "invalid", "reward", "scr", "intraSCR" header := &dataBlock.Header{ TxCount: 5, @@ -397,6 +397,12 @@ func TestBlockProcessor_PrepareBlockForDBMiniBlocksDetails(t *testing.T) { Header: header, OutportBlock: &outport.OutportBlock{ BlockData: &outport.BlockData{ + IntraShardMiniBlocks: []*dataBlock.MiniBlock{ + { + Type: dataBlock.SmartContractResultBlock, + TxHashes: [][]byte{[]byte(intraSCR)}, + }, + }, HeaderBytes: headerBytes, HeaderHash: []byte("hash"), Body: &dataBlock.Body{ @@ -446,6 +452,10 @@ func TestBlockProcessor_PrepareBlockForDBMiniBlocksDetails(t *testing.T) { SmartContractResult: &smartContractResult.SmartContractResult{}, ExecutionOrder: 0, }, + hex.EncodeToString([]byte(intraSCR)): { + SmartContractResult: &smartContractResult.SmartContractResult{}, + ExecutionOrder: 4, + }, }, }, HeaderGasConsumption: &outport.HeaderGasConsumption{}, @@ -458,7 +468,7 @@ func TestBlockProcessor_PrepareBlockForDBMiniBlocksDetails(t *testing.T) { require.Equal(t, &data.Block{ Hash: "68617368", Size: int64(723), - SizeTxs: 15, + SizeTxs: 21, AccumulatedFees: "0", DeveloperFees: "0", TxCount: uint32(5), @@ -468,6 +478,7 @@ func TestBlockProcessor_PrepareBlockForDBMiniBlocksDetails(t *testing.T) { "1183f422a5b76c3cb7b439334f1fe7235c8d09f577e0f1e15e62cd05b9a81950", "b24e307f3917e84603d3ebfb9c03c8fc651b62cb68ca884c3ff015b66a610a79", "c0a855563172b2f72be569963d26d4fae38d4371342e2bf3ded93466a72f36f3", + "381b0f52b35781ddce70dc7ee08907a29f49ed9c46ea0b7b59e5833ba3213d10", }, MiniBlocksDetails: []*data.MiniBlocksDetails{ { @@ -502,6 +513,13 @@ func TestBlockProcessor_PrepareBlockForDBMiniBlocksDetails(t *testing.T) { ProcessingType: dataBlock.Normal.String(), ExecutionOrderTxsIndices: []int{0}, TxsHashes: []string{"736372"}}, + {IndexFirstProcessedTx: 0, + IndexLastProcessedTx: 0, + MBIndex: 4, + Type: dataBlock.SmartContractResultBlock.String(), + ProcessingType: dataBlock.Normal.String(), + ExecutionOrderTxsIndices: []int{4}, + TxsHashes: []string{"696e747261534352"}}, }, }, dbBlock) } diff --git a/process/elasticproc/converters/field.go b/process/elasticproc/converters/field.go index d96d3571..27b6be81 100644 --- a/process/elasticproc/converters/field.go +++ b/process/elasticproc/converters/field.go @@ -21,7 +21,7 @@ func TruncateFieldIfExceedsMaxLengthBase64(field string) string { return field } -//TruncateSliceElementsIfExceedsMaxLength will truncate the provided slice of the field if the max length is exceeded +// TruncateSliceElementsIfExceedsMaxLength will truncate the provided slice of the field if the max length is exceeded func TruncateSliceElementsIfExceedsMaxLength(fields []string) []string { var localFields []string for _, field := range fields { diff --git a/process/elasticproc/elasticProcessor.go b/process/elasticproc/elasticProcessor.go index 6835a8df..640e5f09 100644 --- a/process/elasticproc/elasticProcessor.go +++ b/process/elasticproc/elasticProcessor.go @@ -4,6 +4,7 @@ import ( "bytes" "context" "encoding/hex" + "encoding/json" "fmt" "sync" @@ -29,11 +30,11 @@ var ( elasticIndexer.TransactionsIndex, elasticIndexer.BlockIndex, elasticIndexer.MiniblocksIndex, elasticIndexer.RatingIndex, elasticIndexer.RoundsIndex, elasticIndexer.ValidatorsIndex, elasticIndexer.AccountsIndex, elasticIndexer.AccountsHistoryIndex, elasticIndexer.ReceiptsIndex, elasticIndexer.ScResultsIndex, elasticIndexer.AccountsESDTHistoryIndex, elasticIndexer.AccountsESDTIndex, elasticIndexer.EpochInfoIndex, elasticIndexer.SCDeploysIndex, elasticIndexer.TokensIndex, elasticIndexer.TagsIndex, elasticIndexer.LogsIndex, elasticIndexer.DelegatorsIndex, elasticIndexer.OperationsIndex, - elasticIndexer.ESDTsIndex, + elasticIndexer.ESDTsIndex, elasticIndexer.ValuesIndex, elasticIndexer.EventsIndex, } ) -type objectsMap = map[string]interface{} +const versionStr = "indexer-version" // ArgElasticProcessor holds all dependencies required by the elasticProcessor in order to create // new instances @@ -53,6 +54,7 @@ type ArgElasticProcessor struct { DBClient DatabaseClientHandler LogsAndEventsProc DBLogsAndEventsHandler OperationsProc OperationsHandler + Version string } type elasticProcessor struct { @@ -97,7 +99,9 @@ func NewElasticProcessor(arguments *ArgElasticProcessor) (*elasticProcessor, err return nil, err } - return ei, nil + err = ei.indexVersion(arguments.Version) + + return ei, err } // TODO move all the index create part in a new component @@ -134,6 +138,32 @@ func (ei *elasticProcessor) init(useKibana bool, indexTemplates, _ map[string]*b return nil } +func (ei *elasticProcessor) indexVersion(version string) error { + if version == "" { + log.Debug("ei.elasticProcessor indexer version is empty") + return nil + } + + keyValueObj := &data.KeyValueObj{ + Key: versionStr, + Value: version, + } + + meta := []byte(fmt.Sprintf(`{ "index" : { "_index":"%s", "_id" : "%s" } }%s`, elasticIndexer.ValuesIndex, versionStr, "\n")) + keyValueObjBytes, err := json.Marshal(keyValueObj) + if err != nil { + return err + } + + buffSlice := data.NewBufferSlice(0) + err = buffSlice.PutData(meta, keyValueObjBytes) + if err != nil { + return err + } + + return ei.elasticClient.DoBulkRequest(context.Background(), buffSlice.Buffers()[0], "") +} + // nolint func (ei *elasticProcessor) createIndexPolicies(indexPolicies map[string]*bytes.Buffer) error { indexesPolicies := []string{elasticIndexer.TransactionsPolicy, elasticIndexer.BlockPolicy, elasticIndexer.MiniblocksPolicy, elasticIndexer.RatingPolicy, elasticIndexer.RoundsPolicy, elasticIndexer.ValidatorsPolicy, @@ -298,6 +328,11 @@ func (ei *elasticProcessor) RemoveTransactions(header coreData.HeaderHandler, bo return err } + err = ei.removeFromIndexByTimestampAndShardID(header.GetTimeStamp(), header.GetShardID(), elasticIndexer.EventsIndex) + if err != nil { + return err + } + return ei.updateDelegatorsInCaseOfRevert(header, body) } @@ -330,20 +365,21 @@ func (ei *elasticProcessor) removeIfHashesNotEmpty(index string, hashes []string // RemoveAccountsESDT will remove data from accountsesdt index and accountsesdthistory func (ei *elasticProcessor) RemoveAccountsESDT(headerTimestamp uint64, shardID uint32) error { - ctxWithValue := context.WithValue(context.Background(), request.ContextKey, request.ExtendTopicWithShardID(request.RemoveTopic, shardID)) - query := fmt.Sprintf(`{"query": {"bool": {"must": [{"match": {"shardID": {"query": %d,"operator": "AND"}}},{"match": {"timestamp": {"query": "%d","operator": "AND"}}}]}}}`, shardID, headerTimestamp) - err := ei.elasticClient.DoQueryRemove( - ctxWithValue, - elasticIndexer.AccountsESDTIndex, - bytes.NewBuffer([]byte(query)), - ) + err := ei.removeFromIndexByTimestampAndShardID(headerTimestamp, shardID, elasticIndexer.AccountsESDTIndex) if err != nil { return err } + return ei.removeFromIndexByTimestampAndShardID(headerTimestamp, shardID, elasticIndexer.AccountsESDTHistoryIndex) +} + +func (ei *elasticProcessor) removeFromIndexByTimestampAndShardID(headerTimestamp uint64, shardID uint32, index string) error { + ctxWithValue := context.WithValue(context.Background(), request.ContextKey, request.ExtendTopicWithShardID(request.RemoveTopic, shardID)) + query := fmt.Sprintf(`{"query": {"bool": {"must": [{"match": {"shardID": {"query": %d,"operator": "AND"}}},{"match": {"timestamp": {"query": "%d","operator": "AND"}}}]}}}`, shardID, headerTimestamp) + return ei.elasticClient.DoQueryRemove( ctxWithValue, - elasticIndexer.AccountsESDTHistoryIndex, + index, bytes.NewBuffer([]byte(query)), ) } @@ -394,7 +430,12 @@ func (ei *elasticProcessor) SaveTransactions(obh *outport.OutportBlockWithHeader return err } - err = ei.prepareAndIndexLogs(obh.TransactionPool.Logs, headerTimestamp, buffers) + err = ei.indexLogs(logsData.DBLogs, buffers) + if err != nil { + return err + } + + err = ei.indexEvents(logsData.DBEvents, buffers) if err != nil { return err } @@ -481,16 +522,22 @@ func (ei *elasticProcessor) indexTransactionsFeeData(txsHashFeeData map[string]* return ei.transactionsProc.SerializeTransactionsFeeData(txsHashFeeData, buffSlice, elasticIndexer.OperationsIndex) } -func (ei *elasticProcessor) prepareAndIndexLogs(logsAndEvents []*outport.LogData, timestamp uint64, buffSlice *data.BufferSlice) error { +func (ei *elasticProcessor) indexLogs(logsDB []*data.Logs, buffSlice *data.BufferSlice) error { if !ei.isIndexEnabled(elasticIndexer.LogsIndex) { return nil } - logsDB := ei.logsAndEventsProc.PrepareLogsForDB(logsAndEvents, timestamp) - return ei.logsAndEventsProc.SerializeLogs(logsDB, buffSlice, elasticIndexer.LogsIndex) } +func (ei *elasticProcessor) indexEvents(eventsDB []*data.LogEvent, buffSlice *data.BufferSlice) error { + if !ei.isIndexEnabled(elasticIndexer.EventsIndex) { + return nil + } + + return ei.logsAndEventsProc.SerializeEvents(eventsDB, buffSlice, elasticIndexer.EventsIndex) +} + func (ei *elasticProcessor) indexScDeploys(deployData map[string]*data.ScDeployInfo, changeOwnerOperation map[string]*data.OwnerData, buffSlice *data.BufferSlice) error { if !ei.isIndexEnabled(elasticIndexer.SCDeploysIndex) { return nil diff --git a/process/elasticproc/elasticProcessor_test.go b/process/elasticproc/elasticProcessor_test.go index 381db5db..9f3311af 100644 --- a/process/elasticproc/elasticProcessor_test.go +++ b/process/elasticproc/elasticProcessor_test.go @@ -463,10 +463,18 @@ func TestElasticProcessor_RemoveTransactions(t *testing.T) { dbWriter := &mock.DatabaseWriterStub{ DoQueryRemoveCalled: func(index string, body *bytes.Buffer) error { bodyStr := body.String() - require.Contains(t, []string{dataindexer.TransactionsIndex, dataindexer.OperationsIndex, dataindexer.LogsIndex}, index) - require.True(t, strings.Contains(bodyStr, expectedHashes[0])) - require.True(t, strings.Contains(bodyStr, expectedHashes[1])) - called = true + require.Contains(t, []string{dataindexer.TransactionsIndex, dataindexer.OperationsIndex, dataindexer.LogsIndex, dataindexer.EventsIndex}, index) + if index != dataindexer.EventsIndex { + require.True(t, strings.Contains(bodyStr, expectedHashes[0])) + require.True(t, strings.Contains(bodyStr, expectedHashes[1])) + called = true + } else { + require.Equal(t, + `{"query": {"bool": {"must": [{"match": {"shardID": {"query": 4294967295,"operator": "AND"}}},{"match": {"timestamp": {"query": "0","operator": "AND"}}}]}}}`, + body.String(), + ) + } + return nil }, } diff --git a/process/elasticproc/factory/elasticProcessorFactory.go b/process/elasticproc/factory/elasticProcessorFactory.go index f10ed48f..eb10110c 100644 --- a/process/elasticproc/factory/elasticProcessorFactory.go +++ b/process/elasticproc/factory/elasticProcessorFactory.go @@ -26,6 +26,7 @@ type ArgElasticProcessorFactory struct { ValidatorPubkeyConverter core.PubkeyConverter DBClient elasticproc.DatabaseClientHandler EnabledIndexes []string + Version string Denomination int BulkRequestMaxSize int UseKibana bool @@ -120,6 +121,7 @@ func CreateElasticProcessor(arguments ArgElasticProcessorFactory) (dataindexer.E IndexPolicies: indexPolicies, OperationsProc: operationsProc, ImportDB: arguments.ImportDB, + Version: arguments.Version, } return elasticproc.NewElasticProcessor(args) diff --git a/process/elasticproc/interface.go b/process/elasticproc/interface.go index c059e6cb..997697b2 100644 --- a/process/elasticproc/interface.go +++ b/process/elasticproc/interface.go @@ -91,7 +91,6 @@ type DBValidatorsHandler interface { // DBLogsAndEventsHandler defines the actions that a logs and events handler should do type DBLogsAndEventsHandler interface { - PrepareLogsForDB(logsAndEvents []*outport.LogData, timestamp uint64) []*data.Logs ExtractDataFromLogs( logsAndEvents []*outport.LogData, preparedResults *data.PreparedResults, @@ -100,6 +99,7 @@ type DBLogsAndEventsHandler interface { numOfShards uint32, ) *data.PreparedLogsResults + SerializeEvents(events []*data.LogEvent, buffSlice *data.BufferSlice, index string) error SerializeLogs(logs []*data.Logs, buffSlice *data.BufferSlice, index string) error SerializeSCDeploys(deploysInfo map[string]*data.ScDeployInfo, buffSlice *data.BufferSlice, index string) error SerializeChangeOwnerOperations(changeOwnerOperations map[string]*data.OwnerData, buffSlice *data.BufferSlice, index string) error diff --git a/process/elasticproc/logsevents/logsAndEventsProcessor.go b/process/elasticproc/logsevents/logsAndEventsProcessor.go index f633de3c..d3076699 100644 --- a/process/elasticproc/logsevents/logsAndEventsProcessor.go +++ b/process/elasticproc/logsevents/logsAndEventsProcessor.go @@ -1,6 +1,8 @@ package logsevents import ( + "encoding/hex" + "fmt" "time" "github.com/multiversx/mx-chain-core-go/core" @@ -14,6 +16,8 @@ import ( "github.com/multiversx/mx-chain-es-indexer-go/process/dataindexer" ) +const eventIDFormat = "%s-%d-%d" + // ArgsLogsAndEventsProcessor holds all dependencies required to create new instances of logsAndEventsProcessor type ArgsLogsAndEventsProcessor struct { PubKeyConverter core.PubkeyConverter @@ -26,8 +30,6 @@ type logsAndEventsProcessor struct { hasher hashing.Hasher pubKeyConverter core.PubkeyConverter eventsProcessors []eventsProcessor - - logsData *logsData } // NewLogsAndEventsProcessor will create a new instance for the logsAndEventsProcessor @@ -93,85 +95,88 @@ func (lep *logsAndEventsProcessor) ExtractDataFromLogs( shardID uint32, numOfShards uint32, ) *data.PreparedLogsResults { - lep.logsData = newLogsData(timestamp, preparedResults.Transactions, preparedResults.ScResults) - + lgData := newLogsData(timestamp, preparedResults.Transactions, preparedResults.ScResults) for _, txLog := range logsAndEvents { if txLog == nil { continue } events := txLog.Log.Events - lep.processEvents(txLog.TxHash, txLog.Log.Address, events, shardID, numOfShards) + lep.processEvents(lgData, txLog.TxHash, txLog.Log.Address, events, shardID, numOfShards) - tx, ok := lep.logsData.txsMap[txLog.TxHash] + tx, ok := lgData.txsMap[txLog.TxHash] if ok { tx.HasLogs = true continue } - scr, ok := lep.logsData.scrsMap[txLog.TxHash] + scr, ok := lgData.scrsMap[txLog.TxHash] if ok { scr.HasLogs = true continue } } + dbLogs, dbEvents := lep.prepareLogsForDB(lgData, logsAndEvents, timestamp, shardID) + return &data.PreparedLogsResults{ - Tokens: lep.logsData.tokens, - ScDeploys: lep.logsData.scDeploys, - TokensInfo: lep.logsData.tokensInfo, - TokensSupply: lep.logsData.tokensSupply, - Delegators: lep.logsData.delegators, - NFTsDataUpdates: lep.logsData.nftsDataUpdates, - TokenRolesAndProperties: lep.logsData.tokenRolesAndProperties, - TxHashStatusInfo: lep.logsData.txHashStatusInfoProc.getAllRecords(), - ChangeOwnerOperations: lep.logsData.changeOwnerOperations, + Tokens: lgData.tokens, + ScDeploys: lgData.scDeploys, + TokensInfo: lgData.tokensInfo, + TokensSupply: lgData.tokensSupply, + Delegators: lgData.delegators, + NFTsDataUpdates: lgData.nftsDataUpdates, + TokenRolesAndProperties: lgData.tokenRolesAndProperties, + TxHashStatusInfo: lgData.txHashStatusInfoProc.getAllRecords(), + ChangeOwnerOperations: lgData.changeOwnerOperations, + DBLogs: dbLogs, + DBEvents: dbEvents, } } -func (lep *logsAndEventsProcessor) processEvents(logHashHexEncoded string, logAddress []byte, events []*transaction.Event, shardID uint32, numOfShards uint32) { +func (lep *logsAndEventsProcessor) processEvents(lgData *logsData, logHashHexEncoded string, logAddress []byte, events []*transaction.Event, shardID uint32, numOfShards uint32) { for _, event := range events { if check.IfNil(event) { continue } - lep.processEvent(logHashHexEncoded, logAddress, event, shardID, numOfShards) + lep.processEvent(lgData, logHashHexEncoded, logAddress, event, shardID, numOfShards) } } -func (lep *logsAndEventsProcessor) processEvent(logHashHexEncoded string, logAddress []byte, event coreData.EventHandler, shardID uint32, numOfShards uint32) { +func (lep *logsAndEventsProcessor) processEvent(lgData *logsData, logHashHexEncoded string, logAddress []byte, event coreData.EventHandler, shardID uint32, numOfShards uint32) { for _, proc := range lep.eventsProcessors { res := proc.processEvent(&argsProcessEvent{ event: event, txHashHexEncoded: logHashHexEncoded, logAddress: logAddress, - tokens: lep.logsData.tokens, - tokensSupply: lep.logsData.tokensSupply, - timestamp: lep.logsData.timestamp, - scDeploys: lep.logsData.scDeploys, - txs: lep.logsData.txsMap, - scrs: lep.logsData.scrsMap, - tokenRolesAndProperties: lep.logsData.tokenRolesAndProperties, - txHashStatusInfoProc: lep.logsData.txHashStatusInfoProc, - changeOwnerOperations: lep.logsData.changeOwnerOperations, + tokens: lgData.tokens, + tokensSupply: lgData.tokensSupply, + timestamp: lgData.timestamp, + scDeploys: lgData.scDeploys, + txs: lgData.txsMap, + scrs: lgData.scrsMap, + tokenRolesAndProperties: lgData.tokenRolesAndProperties, + txHashStatusInfoProc: lgData.txHashStatusInfoProc, + changeOwnerOperations: lgData.changeOwnerOperations, selfShardID: shardID, numOfShards: numOfShards, }) if res.tokenInfo != nil { - lep.logsData.tokensInfo = append(lep.logsData.tokensInfo, res.tokenInfo) + lgData.tokensInfo = append(lgData.tokensInfo, res.tokenInfo) } if res.delegator != nil { - lep.logsData.delegators[res.delegator.Address+res.delegator.Contract] = res.delegator + lgData.delegators[res.delegator.Address+res.delegator.Contract] = res.delegator } if res.updatePropNFT != nil { - lep.logsData.nftsDataUpdates = append(lep.logsData.nftsDataUpdates, res.updatePropNFT) + lgData.nftsDataUpdates = append(lgData.nftsDataUpdates, res.updatePropNFT) } - tx, ok := lep.logsData.txsMap[logHashHexEncoded] + tx, ok := lgData.txsMap[logHashHexEncoded] if ok { tx.HasOperations = true continue } - scr, ok := lep.logsData.scrsMap[logHashHexEncoded] + scr, ok := lgData.scrsMap[logHashHexEncoded] if ok { scr.HasOperations = true continue @@ -183,35 +188,37 @@ func (lep *logsAndEventsProcessor) processEvent(logHashHexEncoded string, logAdd } } -// PrepareLogsForDB will prepare logs for database -func (lep *logsAndEventsProcessor) PrepareLogsForDB( +func (lep *logsAndEventsProcessor) prepareLogsForDB( + lgData *logsData, logsAndEvents []*outport.LogData, timestamp uint64, -) []*data.Logs { + shardID uint32, +) ([]*data.Logs, []*data.LogEvent) { logs := make([]*data.Logs, 0, len(logsAndEvents)) + events := make([]*data.LogEvent, 0) for _, txLog := range logsAndEvents { if txLog == nil { continue } - logs = append(logs, lep.prepareLogsForDB(txLog.TxHash, txLog.Log, timestamp)) + dbLog, logEvents := lep.prepareLog(lgData, txLog.TxHash, txLog.Log, timestamp, shardID) + + logs = append(logs, dbLog) + events = append(events, logEvents...) } - return logs + return logs, events } -func (lep *logsAndEventsProcessor) prepareLogsForDB( +func (lep *logsAndEventsProcessor) prepareLog( + lgData *logsData, logHashHex string, eventLogs *transaction.Log, timestamp uint64, -) *data.Logs { - originalTxHash := "" - scr, ok := lep.logsData.scrsMap[logHashHex] - if ok { - originalTxHash = scr.OriginalTxHash - } - + shardID uint32, +) (*data.Logs, []*data.LogEvent) { + originalTxHash := lep.getOriginalTxHash(lgData, logHashHex) encodedAddr := lep.pubKeyConverter.SilentEncode(eventLogs.GetAddress(), log) logsDB := &data.Logs{ ID: logHashHex, @@ -221,22 +228,86 @@ func (lep *logsAndEventsProcessor) prepareLogsForDB( Events: make([]*data.Event, 0, len(eventLogs.Events)), } + dbEvents := make([]*data.LogEvent, 0, len(eventLogs.Events)) for idx, event := range eventLogs.Events { if check.IfNil(event) { continue } - encodedAddress := lep.pubKeyConverter.SilentEncode(event.GetAddress(), log) - - logsDB.Events = append(logsDB.Events, &data.Event{ - Address: encodedAddress, + logEvent := &data.Event{ + Address: lep.pubKeyConverter.SilentEncode(event.GetAddress(), log), Identifier: string(event.GetIdentifier()), Topics: event.GetTopics(), Data: event.GetData(), AdditionalData: event.GetAdditionalData(), Order: idx, - }) + } + logsDB.Events = append(logsDB.Events, logEvent) + + executionOrder := lep.getExecutionOrder(lgData, logHashHex) + dbEvents = append(dbEvents, lep.prepareLogEvent(logsDB, logEvent, shardID, executionOrder)) + } + + return logsDB, dbEvents +} + +func (lep *logsAndEventsProcessor) prepareLogEvent(dbLog *data.Logs, event *data.Event, shardID uint32, execOrder int) *data.LogEvent { + dbEvent := &data.LogEvent{ + TxHash: dbLog.ID, + LogAddress: dbLog.Address, + Address: event.Address, + Identifier: event.Identifier, + Data: hex.EncodeToString(event.Data), + AdditionalData: hexEncodeSlice(event.AdditionalData), + Topics: hexEncodeSlice(event.Topics), + Order: event.Order, + ShardID: shardID, + TxOrder: execOrder, + OriginalTxHash: dbLog.OriginalTxHash, + Timestamp: dbLog.Timestamp, + ID: fmt.Sprintf(eventIDFormat, dbLog.ID, shardID, event.Order), + } + + return dbEvent +} + +func (lep *logsAndEventsProcessor) getOriginalTxHash(lgData *logsData, logHashHex string) string { + if lgData.scrsMap == nil { + return "" + } + + scr, ok := lgData.scrsMap[logHashHex] + if ok { + return scr.OriginalTxHash + } + + return "" +} + +func (lep *logsAndEventsProcessor) getExecutionOrder(lgData *logsData, logHashHex string) int { + tx, ok := lgData.txsMap[logHashHex] + if ok { + return tx.ExecutionOrder + } + + scr, ok := lgData.scrsMap[logHashHex] + if ok { + return scr.ExecutionOrder + } + + log.Warn("cannot find hash in the txs map or scrs map", "hash", logHashHex) + + return -1 +} + +func hexEncodeSlice(input [][]byte) []string { + hexEncoded := make([]string, 0, len(input)) + for idx := 0; idx < len(input); idx++ { + hexEncoded = append(hexEncoded, hex.EncodeToString(input[idx])) + } + if len(hexEncoded) == 0 { + return nil } - return logsDB + return hexEncoded } diff --git a/process/elasticproc/logsevents/logsAndEventsProcessor_test.go b/process/elasticproc/logsevents/logsAndEventsProcessor_test.go index 31e4ea90..321e007b 100644 --- a/process/elasticproc/logsevents/logsAndEventsProcessor_test.go +++ b/process/elasticproc/logsevents/logsAndEventsProcessor_test.go @@ -229,14 +229,13 @@ func TestLogsAndEventsProcessor_PrepareLogsForDB(t *testing.T) { args := createMockArgs() proc, _ := NewLogsAndEventsProcessor(args) - _ = proc.ExtractDataFromLogs(nil, &data.PreparedResults{ScResults: []*data.ScResult{ + result := proc.ExtractDataFromLogs(logsAndEvents, &data.PreparedResults{ScResults: []*data.ScResult{ { Hash: "747848617368", OriginalTxHash: "orignalHash", }, }}, 1234, 0, 3) - logsDB := proc.PrepareLogsForDB(logsAndEvents, 1234) require.Equal(t, &data.Logs{ ID: "747848617368", Address: "61646472657373", @@ -250,7 +249,7 @@ func TestLogsAndEventsProcessor_PrepareLogsForDB(t *testing.T) { AdditionalData: [][]byte{[]byte("something")}, }, }, - }, logsDB[0]) + }, result.DBLogs[0]) } func TestLogsAndEventsProcessor_ExtractDataFromLogsNFTBurn(t *testing.T) { @@ -296,3 +295,83 @@ func TestLogsAndEventsProcessor_ExtractDataFromLogsNFTBurn(t *testing.T) { require.Equal(t, "MY-NFT", tokensSupply[0].Token) require.Equal(t, "MY-NFT-02", tokensSupply[0].Identifier) } + +func TestPrepareLogsAndEvents_LogEvents(t *testing.T) { + t.Parallel() + + logsAndEvents := []*outport.LogData{ + { + TxHash: hex.EncodeToString([]byte("txHash")), + Log: &transaction.Log{ + Address: []byte("address"), + Events: []*transaction.Event{ + { + Address: []byte("addr"), + Identifier: []byte(core.BuiltInFunctionESDTNFTTransfer), + Topics: [][]byte{[]byte("my-token"), big.NewInt(0).SetUint64(1).Bytes(), []byte("receiver")}, + AdditionalData: [][]byte{[]byte("something")}, + }, + { + Address: []byte("addr"), + Identifier: []byte(core.SCDeployIdentifier), + Topics: [][]byte{[]byte("my-token"), big.NewInt(0).SetUint64(1).Bytes()}, + Data: []byte("here"), + AdditionalData: [][]byte{[]byte("something")}, + }, + }, + }, + }, + } + + args := createMockArgs() + proc, _ := NewLogsAndEventsProcessor(args) + + results := proc.ExtractDataFromLogs(logsAndEvents, &data.PreparedResults{ScResults: []*data.ScResult{ + { + Hash: "747848617368", + OriginalTxHash: "originalHash", + }, + }}, 1234, 1, 3) + + require.Equal(t, []*data.LogEvent{ + { + ID: "747848617368-1-0", + TxHash: "747848617368", + OriginalTxHash: "originalHash", + LogAddress: "61646472657373", + Address: "61646472", + Identifier: "ESDTNFTTransfer", + AdditionalData: []string{"736f6d657468696e67"}, + Topics: []string{"6d792d746f6b656e", "01", "7265636569766572"}, + Order: 0, + ShardID: 1, + Timestamp: 1234, + TxOrder: 0, + }, + { + ID: "747848617368-1-1", + TxHash: "747848617368", + OriginalTxHash: "originalHash", + LogAddress: "61646472657373", + Address: "61646472", + Identifier: "SCDeploy", + Data: "68657265", + AdditionalData: []string{"736f6d657468696e67"}, + Topics: []string{"6d792d746f6b656e", "01"}, + Order: 1, + ShardID: 1, + Timestamp: 1234, + TxOrder: 0, + }, + }, results.DBEvents) +} + +func TestHexEncodeSlice(t *testing.T) { + t.Parallel() + + require.Equal(t, []string(nil), hexEncodeSlice(nil)) + require.Equal(t, []string(nil), hexEncodeSlice([][]byte{})) + require.Equal(t, []string{"61", ""}, hexEncodeSlice([][]byte{[]byte("a"), nil})) + require.Equal(t, []string{""}, hexEncodeSlice([][]byte{big.NewInt(0).Bytes()})) + require.Equal(t, []string{"61", "62"}, hexEncodeSlice([][]byte{[]byte("a"), []byte("b")})) +} diff --git a/process/elasticproc/logsevents/serialize.go b/process/elasticproc/logsevents/serialize.go index 488b1282..a96a125a 100644 --- a/process/elasticproc/logsevents/serialize.go +++ b/process/elasticproc/logsevents/serialize.go @@ -10,6 +10,45 @@ import ( "github.com/multiversx/mx-chain-es-indexer-go/process/elasticproc/tokeninfo" ) +// SerializeEvents will serialize the provided events in a way that Elasticsearch expects a bulk request +func (*logsAndEventsProcessor) SerializeEvents(events []*data.LogEvent, buffSlice *data.BufferSlice, index string) error { + for _, event := range events { + meta := []byte(fmt.Sprintf(`{ "update" : { "_index":"%s", "_id" : "%s" } }%s`, index, converters.JsonEscape(event.ID), "\n")) + serializedData, errMarshal := json.Marshal(event) + if errMarshal != nil { + return errMarshal + } + + codeToExecute := ` + if ('create' == ctx.op) { + ctx._source = params.event + } else { + if (ctx._source.containsKey('timestamp')) { + if (ctx._source.timestamp <= params.event.timestamp) { + ctx._source = params.event + } + } else { + ctx._source = params.event + } + } +` + serializedDataStr := fmt.Sprintf(`{"scripted_upsert": true, "script": {`+ + `"source": "%s",`+ + `"lang": "painless",`+ + `"params": { "event": %s }},`+ + `"upsert": {}}`, + converters.FormatPainlessSource(codeToExecute), serializedData, + ) + + err := buffSlice.PutData(meta, []byte(serializedDataStr)) + if err != nil { + return err + } + } + + return nil +} + // SerializeLogs will serialize the provided logs in a way that Elasticsearch expects a bulk request func (*logsAndEventsProcessor) SerializeLogs(logs []*data.Logs, buffSlice *data.BufferSlice, index string) error { for _, lg := range logs { diff --git a/process/elasticproc/templatesAndPolicies/noKibana.go b/process/elasticproc/templatesAndPolicies/noKibana.go index 3f6092b9..a6928858 100644 --- a/process/elasticproc/templatesAndPolicies/noKibana.go +++ b/process/elasticproc/templatesAndPolicies/noKibana.go @@ -40,6 +40,8 @@ func (tr *templatesAndPolicyReaderNoKibana) GetElasticTemplatesAndPolicies() (ma indexTemplates[indexer.DelegatorsIndex] = noKibana.Delegators.ToBuffer() indexTemplates[indexer.OperationsIndex] = noKibana.Operations.ToBuffer() indexTemplates[indexer.ESDTsIndex] = noKibana.ESDTs.ToBuffer() + indexTemplates[indexer.ValuesIndex] = noKibana.Values.ToBuffer() + indexTemplates[indexer.EventsIndex] = noKibana.Events.ToBuffer() return indexTemplates, indexPolicies, nil } diff --git a/process/elasticproc/templatesAndPolicies/noKibana_test.go b/process/elasticproc/templatesAndPolicies/noKibana_test.go index dd000be2..7cbf22b9 100644 --- a/process/elasticproc/templatesAndPolicies/noKibana_test.go +++ b/process/elasticproc/templatesAndPolicies/noKibana_test.go @@ -14,5 +14,5 @@ func TestTemplatesAndPolicyReaderNoKibana_GetElasticTemplatesAndPolicies(t *test templates, policies, err := reader.GetElasticTemplatesAndPolicies() require.Nil(t, err) require.Len(t, policies, 0) - require.Len(t, templates, 21) + require.Len(t, templates, 23) } diff --git a/process/elasticproc/transactions/smartContractResultsProcessor.go b/process/elasticproc/transactions/smartContractResultsProcessor.go index e5f6a72e..2fcb7ccf 100644 --- a/process/elasticproc/transactions/smartContractResultsProcessor.go +++ b/process/elasticproc/transactions/smartContractResultsProcessor.go @@ -189,6 +189,7 @@ func (proc *smartContractResultsProcessor) prepareSmartContractResult( OriginalSender: originalSenderAddr, InitialTxFee: feeInfo.Fee.String(), InitialTxGasUsed: feeInfo.GasUsed, + ExecutionOrder: int(scrInfo.ExecutionOrder), } } diff --git a/process/elasticproc/transactions/transactionDBBuilder.go b/process/elasticproc/transactions/transactionDBBuilder.go index 727563ce..0b497ef7 100644 --- a/process/elasticproc/transactions/transactionDBBuilder.go +++ b/process/elasticproc/transactions/transactionDBBuilder.go @@ -11,7 +11,6 @@ import ( "github.com/multiversx/mx-chain-core-go/data/block" "github.com/multiversx/mx-chain-core-go/data/outport" "github.com/multiversx/mx-chain-core-go/data/receipt" - "github.com/multiversx/mx-chain-core-go/data/rewardTx" "github.com/multiversx/mx-chain-es-indexer-go/data" "github.com/multiversx/mx-chain-es-indexer-go/process/dataindexer" "github.com/multiversx/mx-chain-es-indexer-go/process/elasticproc/converters" @@ -122,17 +121,19 @@ func (dtb *dbTransactionBuilder) prepareTransaction( Version: tx.Version, GuardianAddress: guardianAddress, GuardianSignature: hex.EncodeToString(tx.GuardianSignature), + ExecutionOrder: int(txInfo.ExecutionOrder), } } func (dtb *dbTransactionBuilder) prepareRewardTransaction( - rTx *rewardTx.RewardTx, + rTxInfo *outport.RewardInfo, txHash []byte, mbHash []byte, mb *block.MiniBlock, header coreData.HeaderHandler, txStatus string, ) *data.Transaction { + rTx := rTxInfo.Reward valueNum, err := dtb.balanceConverter.ConvertBigValueToFloat(rTx.Value) if err != nil { log.Warn("dbTransactionBuilder.prepareRewardTransaction cannot compute value as num", "value", rTx.Value, @@ -142,23 +143,24 @@ func (dtb *dbTransactionBuilder) prepareRewardTransaction( receiverAddr := dtb.addressPubkeyConverter.SilentEncode(rTx.RcvAddr, log) return &data.Transaction{ - Hash: hex.EncodeToString(txHash), - MBHash: hex.EncodeToString(mbHash), - Nonce: 0, - Round: rTx.Round, - Value: rTx.Value.String(), - ValueNum: valueNum, - Receiver: receiverAddr, - Sender: fmt.Sprintf("%d", core.MetachainShardId), - ReceiverShard: mb.ReceiverShardID, - SenderShard: mb.SenderShardID, - GasPrice: 0, - GasLimit: 0, - Data: make([]byte, 0), - Signature: "", - Timestamp: time.Duration(header.GetTimeStamp()), - Status: txStatus, - Operation: rewardsOperation, + Hash: hex.EncodeToString(txHash), + MBHash: hex.EncodeToString(mbHash), + Nonce: 0, + Round: rTx.Round, + Value: rTx.Value.String(), + ValueNum: valueNum, + Receiver: receiverAddr, + Sender: fmt.Sprintf("%d", core.MetachainShardId), + ReceiverShard: mb.ReceiverShardID, + SenderShard: mb.SenderShardID, + GasPrice: 0, + GasLimit: 0, + Data: make([]byte, 0), + Signature: "", + Timestamp: time.Duration(header.GetTimeStamp()), + Status: txStatus, + Operation: rewardsOperation, + ExecutionOrder: int(rTxInfo.ExecutionOrder), } } diff --git a/process/elasticproc/transactions/transactionDBBuilder_test.go b/process/elasticproc/transactions/transactionDBBuilder_test.go index 5b3d8759..ceeca3de 100644 --- a/process/elasticproc/transactions/transactionDBBuilder_test.go +++ b/process/elasticproc/transactions/transactionDBBuilder_test.go @@ -116,7 +116,10 @@ func TestGetTransactionByType_RewardTx(t *testing.T) { header := &block.Header{Nonce: 2} status := "Success" - resultTx := cp.prepareRewardTransaction(rwdTx, txHash, mbHash, mb, header, status) + rewardInfo := &outport.RewardInfo{ + Reward: rwdTx, + } + resultTx := cp.prepareRewardTransaction(rewardInfo, txHash, mbHash, mb, header, status) expectedTx := &data.Transaction{ Hash: hex.EncodeToString(txHash), MBHash: hex.EncodeToString(mbHash), diff --git a/process/elasticproc/transactions/transactionsGrouper.go b/process/elasticproc/transactions/transactionsGrouper.go index c6073914..5aaf15a4 100644 --- a/process/elasticproc/transactions/transactionsGrouper.go +++ b/process/elasticproc/transactions/transactionsGrouper.go @@ -151,7 +151,7 @@ func (tg *txsGrouper) prepareRewardTxForDB( return nil, false } - dbTx := tg.txBuilder.prepareRewardTransaction(rtx.Reward, txHash, mbHash, mb, header, mbStatus) + dbTx := tg.txBuilder.prepareRewardTransaction(rtx, txHash, mbHash, mb, header, mbStatus) return dbTx, true } diff --git a/process/factory/indexerFactory.go b/process/factory/indexerFactory.go index 9681b841..5da830f1 100644 --- a/process/factory/indexerFactory.go +++ b/process/factory/indexerFactory.go @@ -36,6 +36,7 @@ type ArgsIndexerFactory struct { UserName string Password string TemplatesPath string + Version string EnabledIndexes []string HeaderMarshaller marshal.Marshalizer Marshalizer marshal.Marshalizer @@ -95,6 +96,7 @@ func createElasticProcessor(args ArgsIndexerFactory) (dataindexer.ElasticProcess EnabledIndexes: args.EnabledIndexes, BulkRequestMaxSize: args.BulkRequestMaxSize, ImportDB: args.ImportDB, + Version: args.Version, } return factory.CreateElasticProcessor(argsElasticProcFac) diff --git a/process/factory/indexerFactory_test.go b/process/factory/indexerFactory_test.go index b6b35110..dfad0558 100644 --- a/process/factory/indexerFactory_test.go +++ b/process/factory/indexerFactory_test.go @@ -116,7 +116,4 @@ func TestIndexerFactoryCreate_ElasticIndexer(t *testing.T) { err = elasticIndexer.Close() require.NoError(t, err) - - err = elasticIndexer.Close() - require.NoError(t, err) } diff --git a/scripts/script.sh b/scripts/script.sh index 83b0867c..11a1bdb4 100755 --- a/scripts/script.sh +++ b/scripts/script.sh @@ -4,7 +4,7 @@ PROMETHEUS_CONTAINER_NAME=prometheus_container GRAFANA_CONTAINER_NAME=grafana_container GRAFANA_VERSION=10.0.3 PROMETHEUS_VERSION=v2.46.0 -INDICES_LIST=("rating" "transactions" "blocks" "validators" "miniblocks" "rounds" "accounts" "accountshistory" "receipts" "scresults" "accountsesdt" "accountsesdthistory" "epochinfo" "scdeploys" "tokens" "tags" "logs" "delegators" "operations" "esdts") +INDICES_LIST=("rating" "transactions" "blocks" "validators" "miniblocks" "rounds" "accounts" "accountshistory" "receipts" "scresults" "accountsesdt" "accountsesdthistory" "epochinfo" "scdeploys" "tokens" "tags" "logs" "delegators" "operations" "esdts" "values" "events") start() { diff --git a/templates/noKibana/events.go b/templates/noKibana/events.go new file mode 100644 index 00000000..bbdce60e --- /dev/null +++ b/templates/noKibana/events.go @@ -0,0 +1,54 @@ +package noKibana + +// Events will hold the configuration for the events index +var Events = Object{ + "index_patterns": Array{ + "events-*", + }, + "settings": Object{ + "number_of_shards": 5, + "number_of_replicas": 0, + }, + "mappings": Object{ + "properties": Object{ + "txHash": Object{ + "type": "keyword", + }, + "originalTxHash": Object{ + "type": "keyword", + }, + "logAddress": Object{ + "type": "keyword", + }, + "address": Object{ + "type": "keyword", + }, + "identifier": Object{ + "type": "keyword", + }, + "shardID": Object{ + "type": "long", + }, + "data": Object{ + "index": "false", + "type": "text", + }, + "additionalData": Object{ + "type": "text", + }, + "topics": Object{ + "type": "text", + }, + "order": Object{ + "type": "long", + }, + "txOrder": Object{ + "type": "long", + }, + "timestamp": Object{ + "type": "date", + "format": "epoch_second", + }, + }, + }, +} diff --git a/templates/noKibana/values.go b/templates/noKibana/values.go new file mode 100644 index 00000000..7ac7b249 --- /dev/null +++ b/templates/noKibana/values.go @@ -0,0 +1,23 @@ +package noKibana + +// Values will hold the configuration for the values index +var Values = Object{ + "index_patterns": Array{ + "values-*", + }, + "settings": Object{ + "number_of_shards": 1, + "number_of_replicas": 0, + }, + + "mappings": Object{ + "properties": Object{ + "key": Object{ + "type": "keyword", + }, + "value": Object{ + "type": "keyword", + }, + }, + }, +} diff --git a/tools/clusters-checker/pkg/checkers/process_indices_with_timestamp.go b/tools/clusters-checker/pkg/checkers/process_indices_with_timestamp.go index b925759a..08eb98a7 100644 --- a/tools/clusters-checker/pkg/checkers/process_indices_with_timestamp.go +++ b/tools/clusters-checker/pkg/checkers/process_indices_with_timestamp.go @@ -42,9 +42,12 @@ func (cc *clusterChecker) CompareIndicesWithTimestamp() error { func (cc *clusterChecker) compareIndexWithTimestamp(index string) error { rspSource := &generalElasticResponse{} + + withSource := !cc.onlyIDs + nextScrollIDSource, doneSource, err := cc.clientSource.InitializeScroll( index, - getAllSortTimestampASC(true, cc.startTimestamp, cc.stopTimestamp), + getAllSortTimestampASC(withSource, cc.startTimestamp, cc.stopTimestamp), rspSource, ) if err != nil { @@ -54,7 +57,7 @@ func (cc *clusterChecker) compareIndexWithTimestamp(index string) error { rspDestination := &generalElasticResponse{} nextScrollIDDestination, doneDestination, err := cc.clientDestination.InitializeScroll( index, - getAllSortTimestampASC(true, cc.startTimestamp, cc.stopTimestamp), + getAllSortTimestampASC(withSource, cc.startTimestamp, cc.stopTimestamp), rspDestination, ) if err != nil { diff --git a/tools/clusters-checker/pkg/checkers/query.go b/tools/clusters-checker/pkg/checkers/query.go index bf090db0..de07ed0d 100644 --- a/tools/clusters-checker/pkg/checkers/query.go +++ b/tools/clusters-checker/pkg/checkers/query.go @@ -36,8 +36,8 @@ func getAllSortTimestampASC(withSource bool, start, stop int) []byte { "query": object{ "range": object{ "timestamp": object{ - "gte": start, - "lte": stop, + "gte": fmt.Sprintf("%d", start), + "lte": fmt.Sprintf("%d", stop), }, }, }, @@ -45,7 +45,7 @@ func getAllSortTimestampASC(withSource bool, start, stop int) []byte { "sort": []interface{}{ object{ "timestamp": object{ - "order": "asc", + "order": "desc", }, }, },