Skip to content

Commit

Permalink
Merge branch 'master' into docker-build-action
Browse files Browse the repository at this point in the history
  • Loading branch information
iulianpascalau committed Jul 30, 2024
2 parents 70ffce2 + 6608ee8 commit 30bfcb3
Show file tree
Hide file tree
Showing 56 changed files with 1,248 additions and 176 deletions.
41 changes: 40 additions & 1 deletion client/elasticClient.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,8 +153,14 @@ func (ec *elasticClient) DoQueryRemove(ctx context.Context, index string, body *
log.Warn("elasticClient.doRefresh", "cannot do refresh", err)
}

writeIndex, err := ec.getWriteIndex(index)
if err != nil {
log.Warn("elasticClient.getWriteIndex", "cannot do get write index", err)
return err
}

res, err := ec.client.DeleteByQuery(
[]string{index},
[]string{writeIndex},
body,
ec.client.DeleteByQuery.WithIgnoreUnavailable(true),
ec.client.DeleteByQuery.WithConflicts(esConflictsPolicy),
Expand Down Expand Up @@ -323,6 +329,39 @@ func (ec *elasticClient) createAlias(alias string, index string) error {
return parseResponse(res, nil, elasticDefaultErrorResponseHandler)
}

func (ec *elasticClient) getWriteIndex(alias string) (string, error) {
res, err := ec.client.Indices.GetAlias(
ec.client.Indices.GetAlias.WithIndex(alias),
)
if err != nil {
return "", err
}

var indexData map[string]struct {
Aliases map[string]struct {
IsWriteIndex bool `json:"is_write_index"`
} `json:"aliases"`
}
err = parseResponse(res, &indexData, elasticDefaultErrorResponseHandler)
if err != nil {
return "", err
}

for index, details := range indexData {
if len(indexData) == 1 {
return index, nil
}

for _, indexAlias := range details.Aliases {
if indexAlias.IsWriteIndex {
return index, nil
}
}
}

return alias, nil
}

// UpdateByQuery will update all the documents that match the provided query from the provided index
func (ec *elasticClient) UpdateByQuery(ctx context.Context, index string, buff *bytes.Buffer) error {
reader := bytes.NewReader(buff.Bytes())
Expand Down
2 changes: 1 addition & 1 deletion client/elasticClientCommon.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ func elasticBulkRequestResponseHandler(res *esapi.Response) error {
return fmt.Errorf("%s", res.String())
}

bodyBytes, err := ioutil.ReadAll(res.Body)
bodyBytes, err := io.ReadAll(res.Body)
if err != nil {
return fmt.Errorf("%w cannot read elastic response body bytes", err)
}
Expand Down
52 changes: 50 additions & 2 deletions client/elasticClient_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ package client

import (
"context"
"io/ioutil"
"io"
"net/http"
"net/http/httptest"
"os"
Expand Down Expand Up @@ -53,7 +53,7 @@ func TestElasticClient_DoMultiGet(t *testing.T) {
jsonFile, err := os.Open("./testsData/response-multi-get.json")
require.Nil(t, err)

byteValue, _ := ioutil.ReadAll(jsonFile)
byteValue, _ := io.ReadAll(jsonFile)
_, _ = w.Write(byteValue)
}

Expand All @@ -75,3 +75,51 @@ func TestElasticClient_DoMultiGet(t *testing.T) {
_, ok := resMap["docs"]
require.True(t, ok)
}

func TestElasticClient_GetWriteIndexMultipleIndicesBehind(t *testing.T) {
handler := http.NotFound
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
handler(w, r)
}))
defer ts.Close()

handler = func(w http.ResponseWriter, r *http.Request) {
jsonFile, err := os.Open("./testsData/response-get-alias.json")
require.Nil(t, err)

byteValue, _ := io.ReadAll(jsonFile)
_, _ = w.Write(byteValue)
}

esClient, _ := NewElasticClient(elasticsearch.Config{
Addresses: []string{ts.URL},
Logger: &logging.CustomLogger{},
})
res, err := esClient.getWriteIndex("blocks")
require.Nil(t, err)
require.Equal(t, "blocks-000004", res)
}

func TestElasticClient_GetWriteIndexOneIndex(t *testing.T) {
handler := http.NotFound
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
handler(w, r)
}))
defer ts.Close()

handler = func(w http.ResponseWriter, r *http.Request) {
jsonFile, err := os.Open("./testsData/response-get-alias-only-one-index.json")
require.Nil(t, err)

byteValue, _ := io.ReadAll(jsonFile)
_, _ = w.Write(byteValue)
}

esClient, _ := NewElasticClient(elasticsearch.Config{
Addresses: []string{ts.URL},
Logger: &logging.CustomLogger{},
})
res, err := esClient.getWriteIndex("delegators")
require.Nil(t, err)
require.Equal(t, "delegators-000001", res)
}
7 changes: 7 additions & 0 deletions client/testsData/response-get-alias-only-one-index.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
{
"delegators-000001" : {
"aliases" : {
"delegators" : { }
}
}
}
30 changes: 30 additions & 0 deletions client/testsData/response-get-alias.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
{
"blocks-000003": {
"aliases": {
"blocks": {
"is_write_index": false
}
}
},
"blocks-000004": {
"aliases": {
"blocks": {
"is_write_index": true
}
}
},
"blocks-000002": {
"aliases": {
"blocks": {
"is_write_index": false
}
}
},
"blocks-000001": {
"aliases": {
"blocks": {
"is_write_index": false
}
}
}
}
2 changes: 1 addition & 1 deletion cmd/elasticindexer/config/config.toml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
available-indices = [
"rating", "transactions", "blocks", "validators", "miniblocks", "rounds", "accounts", "accountshistory",
"receipts", "scresults", "accountsesdt", "accountsesdthistory", "epochinfo", "scdeploys", "tokens", "tags",
"logs", "delegators", "operations", "esdts"
"logs", "delegators", "operations", "esdts", "values", "events"
]
[config.address-converter]
length = 32
Expand Down
6 changes: 0 additions & 6 deletions cmd/elasticindexer/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,10 +47,4 @@ var (
Name: "disable-ansi-color",
Usage: "Boolean option for disabling ANSI colors in the logging system.",
}
importDB = cli.BoolFlag{
Name: "import-db",
Usage: "This flag, when enabled, triggers the indexer to operate in import database mode. In this mode," +
" the indexer excludes the indexing of cross shard transactions received from the source shard. " +
"This flag must be enabled when the observers are in import database mode.",
}
)
17 changes: 14 additions & 3 deletions cmd/elasticindexer/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,18 @@ VERSION:
`
)

// appVersion should be populated at build time using ldflags
// Usage examples:
// linux/mac:
//
// go build -v -ldflags="-X main.appVersion=$(git describe --tags --long --dirty)"
//
// windows:
//
// for /f %i in ('git describe --tags --long --dirty') do set VERS=%i
// go build -v -ldflags="-X main.version=%VERS%"
var version = "undefined"

func main() {
app := cli.NewApp()
cli.AppHelpTemplate = helpTemplate
Expand All @@ -51,7 +63,6 @@ func main() {
logLevel,
logSaveFile,
disableAnsiColor,
importDB,
}
app.Authors = []cli.Author{
{
Expand All @@ -60,6 +71,7 @@ func main() {
},
}

app.Version = version
app.Action = startIndexer

err := app.Run(os.Args)
Expand All @@ -85,9 +97,8 @@ func startIndexer(ctx *cli.Context) error {
return fmt.Errorf("%w while initializing the logger", err)
}

importDBMode := ctx.GlobalBool(importDB.Name)
statusMetrics := metrics.NewStatusMetrics()
wsHost, err := factory.CreateWsIndexer(cfg, clusterCfg, importDBMode, statusMetrics)
wsHost, err := factory.CreateWsIndexer(cfg, clusterCfg, statusMetrics, ctx.App.Version)
if err != nil {
return fmt.Errorf("%w while creating the indexer", err)
}
Expand Down
6 changes: 6 additions & 0 deletions data/data.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,3 +47,9 @@ type ResponseScroll struct {
} `json:"hits"`
} `json:"hits"`
}

// KeyValueObj is the dto for values index
type KeyValueObj struct {
Key string `json:"key"`
Value string `json:"value"`
}
20 changes: 20 additions & 0 deletions data/event.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package data

import "time"

// LogEvent is the dto for the log event structure
type LogEvent struct {
ID string `json:"-"`
TxHash string `json:"txHash"`
OriginalTxHash string `json:"originalTxHash,omitempty"`
LogAddress string `json:"logAddress"`
Address string `json:"address"`
Identifier string `json:"identifier"`
Data string `json:"data,omitempty"`
AdditionalData []string `json:"additionalData,omitempty"`
Topics []string `json:"topics"`
Order int `json:"order"`
TxOrder int `json:"txOrder"`
ShardID uint32 `json:"shardID"`
Timestamp time.Duration `json:"timestamp,omitempty"`
}
2 changes: 2 additions & 0 deletions data/logs.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,4 +37,6 @@ type PreparedLogsResults struct {
TokensInfo []*TokenInfo
NFTsDataUpdates []*NFTDataUpdate
TokenRolesAndProperties *tokeninfo.TokenRolesAndProperties
DBLogs []*Logs
DBEvents []*LogEvent
}
1 change: 1 addition & 0 deletions data/scresult.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ type ScResult struct {
CanBeIgnored bool `json:"canBeIgnored,omitempty"`
OriginalSender string `json:"originalSender,omitempty"`
HasLogs bool `json:"hasLogs,omitempty"`
ExecutionOrder int `json:"-"`
SenderAddressBytes []byte `json:"-"`
InitialTxGasUsed uint64 `json:"-"`
InitialTxFee string `json:"-"`
Expand Down
5 changes: 3 additions & 2 deletions data/transaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@ import (
)

// Transaction is a structure containing all the fields that need
// to be saved for a transaction. It has all the default fields
// plus some extra information for ease of search and filter
// to be saved for a transaction. It has all the default fields
// plus some extra information for ease of search and filter
type Transaction struct {
MBHash string `json:"miniBlockHash"`
Nonce uint64 `json:"nonce"`
Expand Down Expand Up @@ -48,6 +48,7 @@ type Transaction struct {
GuardianSignature string `json:"guardianSignature,omitempty"`
ErrorEvent bool `json:"errorEvent,omitempty"`
CompletedEvent bool `json:"completedEvent,omitempty"`
ExecutionOrder int `json:"-"`
SmartContractResults []*ScResult `json:"-"`
Hash string `json:"-"`
BlockHash string `json:"-"`
Expand Down
1 change: 0 additions & 1 deletion docker-compose.yml
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
version: "3.0"
services:
elasticsearch:
container_name: es-container
Expand Down
8 changes: 4 additions & 4 deletions factory/wsIndexerFactory.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,13 @@ import (
var log = logger.GetOrCreate("elasticindexer")

// CreateWsIndexer will create a new instance of wsindexer.WSClient
func CreateWsIndexer(cfg config.Config, clusterCfg config.ClusterConfig, importDB bool, statusMetrics core.StatusMetricsHandler) (wsindexer.WSClient, error) {
func CreateWsIndexer(cfg config.Config, clusterCfg config.ClusterConfig, statusMetrics core.StatusMetricsHandler, version string) (wsindexer.WSClient, error) {
wsMarshaller, err := factoryMarshaller.NewMarshalizer(clusterCfg.Config.WebSocket.DataMarshallerType)
if err != nil {
return nil, err
}

dataIndexer, err := createDataIndexer(cfg, clusterCfg, wsMarshaller, importDB, statusMetrics)
dataIndexer, err := createDataIndexer(cfg, clusterCfg, wsMarshaller, statusMetrics, version)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -55,8 +55,8 @@ func createDataIndexer(
cfg config.Config,
clusterCfg config.ClusterConfig,
wsMarshaller marshal.Marshalizer,
importDB bool,
statusMetrics core.StatusMetricsHandler,
version string,
) (wsindexer.DataIndexer, error) {
marshaller, err := factoryMarshaller.NewMarshalizer(cfg.Config.Marshaller.Type)
if err != nil {
Expand Down Expand Up @@ -88,8 +88,8 @@ func createDataIndexer(
AddressPubkeyConverter: addressPubkeyConverter,
ValidatorPubkeyConverter: validatorPubkeyConverter,
HeaderMarshaller: wsMarshaller,
ImportDB: importDB,
StatusMetrics: statusMetrics,
Version: version,
})
}

Expand Down
8 changes: 4 additions & 4 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,10 @@ require (
github.com/elastic/go-elasticsearch/v7 v7.12.0
github.com/gin-contrib/cors v1.4.0
github.com/gin-gonic/gin v1.9.1
github.com/multiversx/mx-chain-communication-go v1.0.13-0.20240126121117-627adccf10ad
github.com/multiversx/mx-chain-core-go v1.2.19-0.20240129082057-a76d0c995cf2
github.com/multiversx/mx-chain-logger-go v1.0.14-0.20240129144507-d00e967c890c
github.com/multiversx/mx-chain-vm-common-go v1.5.12-0.20240129145149-4fe61574f566
github.com/multiversx/mx-chain-communication-go v1.0.14
github.com/multiversx/mx-chain-core-go v1.2.19
github.com/multiversx/mx-chain-logger-go v1.0.14
github.com/multiversx/mx-chain-vm-common-go v1.5.12
github.com/prometheus/client_model v0.4.0
github.com/prometheus/common v0.37.0
github.com/stretchr/testify v1.8.4
Expand Down
18 changes: 9 additions & 9 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -247,15 +247,15 @@ github.com/modern-go/reflect2 v1.0.2 h1:xBagoLtFs94CBntxluKeaWgTMpvLxC4ur3nMaC9G
github.com/modern-go/reflect2 v1.0.2/go.mod h1:yWuevngMOJpCy52FWWMvUC8ws7m/LJsjYzDa0/r8luk=
github.com/mr-tron/base58 v1.2.0 h1:T/HDJBh4ZCPbU39/+c3rRvE0uKBQlU27+QI8LJ4t64o=
github.com/mr-tron/base58 v1.2.0/go.mod h1:BinMc/sQntlIE1frQmRFPUoPA1Zkr8VRgBdjWI2mNwc=
github.com/multiversx/mx-chain-communication-go v1.0.13-0.20240126121117-627adccf10ad h1:izxTyKCxvT7z2mhXCWAZibSxwRVgLmq/kDovs4Nx/6Y=
github.com/multiversx/mx-chain-communication-go v1.0.13-0.20240126121117-627adccf10ad/go.mod h1:n4E8BWIV0g3AcNGe1gf+vcjUC8A2QCJ4ARQSbiUDGrI=
github.com/multiversx/mx-chain-core-go v1.2.19-0.20240129082057-a76d0c995cf2 h1:pFh9bwOTRgW173aHqA8Bmax+jYzLnRyXqRvi5alF7V4=
github.com/multiversx/mx-chain-core-go v1.2.19-0.20240129082057-a76d0c995cf2/go.mod h1:B5zU4MFyJezmEzCsAHE9YNULmGCm2zbPHvl9hazNxmE=
github.com/multiversx/mx-chain-crypto-go v1.2.10-0.20231129101537-ef355850e34b h1:TIE6it719ZIW0E1bFgPAgE+U3zPSkPfAloFYEIeOL3U=
github.com/multiversx/mx-chain-logger-go v1.0.14-0.20240129144507-d00e967c890c h1:QIUOn8FgNRa5cir4BCWHZi/Qcr6Gg0eGNhns4+jy6+k=
github.com/multiversx/mx-chain-logger-go v1.0.14-0.20240129144507-d00e967c890c/go.mod h1:fH/fR/GEBsDjPkBoZDVJMoYo2HhlA7++DP6QfITJ1N8=
github.com/multiversx/mx-chain-vm-common-go v1.5.12-0.20240129145149-4fe61574f566 h1:zImJa/r6B5L2OLWbKTn5io53U11PPGDla12H2OaJ9y0=
github.com/multiversx/mx-chain-vm-common-go v1.5.12-0.20240129145149-4fe61574f566/go.mod h1:OUyhCFqZKqUk1uaPsenyPDwO1830SlHNDU7Q7b6CBVI=
github.com/multiversx/mx-chain-communication-go v1.0.14 h1:YhAUDjBBpc5h5W0A7LHLXUMIMeCgwgGvkqfAPbFqsno=
github.com/multiversx/mx-chain-communication-go v1.0.14/go.mod h1:qYCqgk0h+YpcTA84jHIpCBy6UShRwmXzHSCcdfwNrkw=
github.com/multiversx/mx-chain-core-go v1.2.19 h1:2BaVHkB0tro3cjs5ay2pmLup1loCV0e1p9jV5QW0xqc=
github.com/multiversx/mx-chain-core-go v1.2.19/go.mod h1:B5zU4MFyJezmEzCsAHE9YNULmGCm2zbPHvl9hazNxmE=
github.com/multiversx/mx-chain-crypto-go v1.2.11 h1:MNPJoiTJA5/tedYrI0N22OorbsKDESWG0SF8MCJwcJI=
github.com/multiversx/mx-chain-logger-go v1.0.14 h1:PRMpAvXE7Nec2d//QNmbYfKVHMomOKmcN4UXurQWX9o=
github.com/multiversx/mx-chain-logger-go v1.0.14/go.mod h1:bDfHSdwqIimn7Gp8w+SH5KlDuGzJ//nlyEANAaTSc3o=
github.com/multiversx/mx-chain-vm-common-go v1.5.12 h1:Q8F6DE7XhgHtWgg2rozSv4Tv5fE3ENkJz6mjRoAfht8=
github.com/multiversx/mx-chain-vm-common-go v1.5.12/go.mod h1:Sv6iS1okB6gy3HAsW6KHYtAxShNAfepKLtu//AURI8c=
github.com/mwitkow/go-conntrack v0.0.0-20161129095857-cc309e4a2223/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U=
github.com/mwitkow/go-conntrack v0.0.0-20190716064945-2f068394615f/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U=
github.com/nxadm/tail v1.4.4/go.mod h1:kenIhsEOeOJmVchQTgglprH7qJGnHDVpk1VPCcaMI8A=
Expand Down
1 change: 1 addition & 0 deletions integrationtests/accountsBalanceNftTransfer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ func createOutportBlockWithHeader(
TransactionPool: pool,
AlteredAccounts: coreAlteredAccounts,
NumberOfShards: numOfShards,
ShardID: header.GetShardID(),
},
Header: header,
}
Expand Down
Loading

0 comments on commit 30bfcb3

Please sign in to comment.