Skip to content

Commit

Permalink
Merge pull request #227 from multiversx/request-node-setting
Browse files Browse the repository at this point in the history
Request node settings
  • Loading branch information
miiu96 authored Jun 9, 2023
2 parents a1c5839 + 820c92b commit dd828c1
Show file tree
Hide file tree
Showing 13 changed files with 110 additions and 21 deletions.
30 changes: 29 additions & 1 deletion cmd/elasticindexer/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,10 @@ import (
"github.com/multiversx/mx-chain-core-go/core"
"github.com/multiversx/mx-chain-core-go/core/check"
"github.com/multiversx/mx-chain-core-go/core/closing"
"github.com/multiversx/mx-chain-core-go/data/outport"
"github.com/multiversx/mx-chain-es-indexer-go/config"
"github.com/multiversx/mx-chain-es-indexer-go/factory"
"github.com/multiversx/mx-chain-es-indexer-go/process/wsindexer"
logger "github.com/multiversx/mx-chain-logger-go"
"github.com/multiversx/mx-chain-logger-go/file"
"github.com/urfave/cli"
Expand Down Expand Up @@ -90,7 +92,12 @@ func startIndexer(ctx *cli.Context) error {
interrupt := make(chan os.Signal, 1)
signal.Notify(interrupt, syscall.SIGINT, syscall.SIGTERM)

<-interrupt
retryDuration := time.Duration(clusterCfg.Config.WebSocket.RetryDurationInSec) * time.Second
closed := requestSettings(wsHost, retryDuration, interrupt)
if !closed {
<-interrupt
}

log.Info("closing app at user's signal")
err = wsHost.Close()
if err != nil {
Expand All @@ -104,6 +111,27 @@ func startIndexer(ctx *cli.Context) error {
return nil
}

func requestSettings(host wsindexer.WSClient, retryDuration time.Duration, close chan os.Signal) bool {
timer := time.NewTimer(0)
defer timer.Stop()

emptyMessage := make([]byte, 0)
for {
select {
case <-timer.C:
err := host.Send(emptyMessage, outport.TopicSettings)
if err == nil {
return false
}
log.Debug("unable to request settings - will retry", "error", err)

timer.Reset(retryDuration)
case <-close:
return true
}
}
}

func loadMainConfig(filepath string) (config.Config, error) {
cfg := config.Config{}
err := core.LoadTomlFile(&cfg, filepath)
Expand Down
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@ go 1.17

require (
github.com/elastic/go-elasticsearch/v7 v7.12.0
github.com/multiversx/mx-chain-communication-go v1.0.3-0.20230607101542-b80e6e676772
github.com/multiversx/mx-chain-core-go v1.2.4
github.com/multiversx/mx-chain-communication-go v1.0.3-0.20230607150138-28d0e4242a47
github.com/multiversx/mx-chain-core-go v1.2.6-0.20230529102659-223d4826d177
github.com/multiversx/mx-chain-logger-go v1.0.11
github.com/multiversx/mx-chain-vm-common-go v1.4.1
github.com/stretchr/testify v1.8.2
Expand Down
8 changes: 4 additions & 4 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -471,13 +471,13 @@ github.com/multiformats/go-varint v0.0.2/go.mod h1:3Ls8CIEsrijN6+B7PbrXRPxHRPuXS
github.com/multiformats/go-varint v0.0.5/go.mod h1:3Ls8CIEsrijN6+B7PbrXRPxHRPuXSrVKRY101jdMZYE=
github.com/multiformats/go-varint v0.0.6/go.mod h1:3Ls8CIEsrijN6+B7PbrXRPxHRPuXSrVKRY101jdMZYE=
github.com/multiversx/concurrent-map v0.1.4/go.mod h1:8cWFRJDOrWHOTNSqgYCUvwT7c7eFQ4U2vKMOp4A/9+o=
github.com/multiversx/mx-chain-communication-go v1.0.3-0.20230607101542-b80e6e676772 h1:K5lIIlpjBckiQtNJLCpTHkaYAnEUBA3xSkQ6kC0MpUY=
github.com/multiversx/mx-chain-communication-go v1.0.3-0.20230607101542-b80e6e676772/go.mod h1:OnnSUjnSP87H5MtQtxn33FGnTVRVgo2Huo3ijmCgN2M=
github.com/multiversx/mx-chain-communication-go v1.0.3-0.20230607150138-28d0e4242a47 h1:qizL4OFH8IPumgoP2gKrxamS+Oi6NMKICCrvNRp8Ctw=
github.com/multiversx/mx-chain-communication-go v1.0.3-0.20230607150138-28d0e4242a47/go.mod h1:OnnSUjnSP87H5MtQtxn33FGnTVRVgo2Huo3ijmCgN2M=
github.com/multiversx/mx-chain-core-go v1.1.30/go.mod h1:8gGEQv6BWuuJwhd25qqhCOZbBSv9mk+hLeKvinSaSMk=
github.com/multiversx/mx-chain-core-go v1.2.1-0.20230510143029-ab37792342df/go.mod h1:jzYFSiYBuO0dGpGFXnZWSwcwcKP7Flyn/X41y4zIQrQ=
github.com/multiversx/mx-chain-core-go v1.2.1/go.mod h1:8gGEQv6BWuuJwhd25qqhCOZbBSv9mk+hLeKvinSaSMk=
github.com/multiversx/mx-chain-core-go v1.2.4 h1:BRXyajUevLU6zHszR8jnp2+7C2bAQBor51YTc4dp3YQ=
github.com/multiversx/mx-chain-core-go v1.2.4/go.mod h1:jzYFSiYBuO0dGpGFXnZWSwcwcKP7Flyn/X41y4zIQrQ=
github.com/multiversx/mx-chain-core-go v1.2.6-0.20230529102659-223d4826d177 h1:VVvMV4KufAF1uNxdTLODWwT+l+l2m4Cek8Dc4LgCo60=
github.com/multiversx/mx-chain-core-go v1.2.6-0.20230529102659-223d4826d177/go.mod h1:jzYFSiYBuO0dGpGFXnZWSwcwcKP7Flyn/X41y4zIQrQ=
github.com/multiversx/mx-chain-crypto-go v1.2.6/go.mod h1:rOj0Rr19HTOYt9YTeym7RKxlHt91NXln3LVKjHKVmA0=
github.com/multiversx/mx-chain-logger-go v1.0.11 h1:DFsHa+sc5fKwhDR50I8uBM99RTDTEW68ESyr5ALRDwE=
github.com/multiversx/mx-chain-logger-go v1.0.11/go.mod h1:1srDkP0DQucWQ+rYfaq0BX2qLnULsUdRPADpYUTM6dA=
Expand Down
5 changes: 5 additions & 0 deletions mock/elasticProcessorStub.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,11 @@ func (eim *ElasticProcessorStub) SaveAccounts(accounts *outport.Accounts) error
return nil
}

// SetOutportConfig -
func (eim *ElasticProcessorStub) SetOutportConfig(_ outport.OutportConfig) error {
return nil
}

// IsInterfaceNil returns true if there is no value under the interface
func (eim *ElasticProcessorStub) IsInterfaceNil() bool {
return eim == nil
Expand Down
12 changes: 12 additions & 0 deletions process/dataindexer/dataIndexer.go
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,18 @@ func (di *dataIndexer) GetMarshaller() marshal.Marshalizer {
return di.headerMarshaller
}

// RegisterHandlerForSettingsRequest will do nothing
func (di *dataIndexer) RegisterHandlerForSettingsRequest(_ func()) error {
return nil
}

// SetCurrentSettings will set the provided settings
func (di *dataIndexer) SetCurrentSettings(cfg outport.OutportConfig) error {
log.Debug("dataIndexer.SetCurrentSettings", "importDBMode", cfg.IsInImportDBMode)

return di.elasticProcessor.SetOutportConfig(cfg)
}

// IsInterfaceNil returns true if there is no value under the interface
func (di *dataIndexer) IsInterfaceNil() bool {
return di == nil
Expand Down
3 changes: 3 additions & 0 deletions process/dataindexer/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ type ElasticProcessor interface {
SaveRoundsInfo(rounds *outport.RoundsInfo) error
SaveShardValidatorsPubKeys(validatorsPubKeys *outport.ValidatorsPubKeys) error
SaveAccounts(accounts *outport.Accounts) error
SetOutportConfig(cfg outport.OutportConfig) error
IsInterfaceNil() bool
}

Expand Down Expand Up @@ -55,6 +56,8 @@ type Indexer interface {
SaveAccounts(accountsData *outport.Accounts) error
FinalizedBlock(finalizedBlock *outport.FinalizedBlock) error
GetMarshaller() marshal.Marshalizer
RegisterHandlerForSettingsRequest(handler func()) error
SetCurrentSettings(cfg outport.OutportConfig) error
Close() error
IsInterfaceNil() bool
}
Expand Down
23 changes: 21 additions & 2 deletions process/elasticproc/elasticProcessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"bytes"
"encoding/hex"
"fmt"
"sync"

"github.com/multiversx/mx-chain-core-go/core"
"github.com/multiversx/mx-chain-core-go/core/check"
Expand Down Expand Up @@ -63,6 +64,7 @@ type elasticProcessor struct {
bulkRequestMaxSize int
importDB bool
enabledIndexes map[string]struct{}
mutex sync.RWMutex
elasticClient DatabaseClientHandler
accountsProc DBAccountHandler
blockProc DBBlockHandler
Expand Down Expand Up @@ -408,7 +410,7 @@ func (ei *elasticProcessor) miniblocksInDBMap(mbs []*data.Miniblock) (map[string
func (ei *elasticProcessor) SaveTransactions(obh *outport.OutportBlockWithHeader) error {
headerTimestamp := obh.Header.GetTimeStamp()

preparedResults := ei.transactionsProc.PrepareTransactionsForDatabase(obh.BlockData.Body, obh.Header, obh.TransactionPool, ei.importDB, obh.NumberOfShards)
preparedResults := ei.transactionsProc.PrepareTransactionsForDatabase(obh.BlockData.Body, obh.Header, obh.TransactionPool, ei.isImportDB(), obh.NumberOfShards)
logsData := ei.logsAndEventsProc.ExtractDataFromLogs(obh.TransactionPool.Logs, preparedResults, headerTimestamp, obh.Header.GetShardID(), obh.NumberOfShards)

buffers := data.NewBufferSlice(ei.bulkRequestMaxSize)
Expand All @@ -417,7 +419,7 @@ func (ei *elasticProcessor) SaveTransactions(obh *outport.OutportBlockWithHeader
return err
}

err = ei.prepareAndIndexOperations(preparedResults.Transactions, preparedResults.TxHashStatus, obh.Header, preparedResults.ScResults, buffers, ei.importDB)
err = ei.prepareAndIndexOperations(preparedResults.Transactions, preparedResults.TxHashStatus, obh.Header, preparedResults.ScResults, buffers, ei.isImportDB())
if err != nil {
return err
}
Expand Down Expand Up @@ -815,6 +817,23 @@ func (ei *elasticProcessor) doBulkRequests(index string, buffSlice []*bytes.Buff
return nil
}

// SetOutportConfig will set the outport config
func (ei *elasticProcessor) SetOutportConfig(cfg outport.OutportConfig) error {
ei.mutex.Lock()
defer ei.mutex.Unlock()

ei.importDB = cfg.IsInImportDBMode

return nil
}

func (ei *elasticProcessor) isImportDB() bool {
ei.mutex.RLock()
defer ei.mutex.RUnlock()

return ei.importDB
}

// IsInterfaceNil returns true if there is no value under the interface
func (ei *elasticProcessor) IsInterfaceNil() bool {
return ei == nil
Expand Down
11 changes: 11 additions & 0 deletions process/wsindexer/indexer.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ func (i *indexer) initActionsMap() {
outport.TopicSaveValidatorsPubKeys: i.saveValidatorsPubKeys,
outport.TopicSaveAccounts: i.saveAccounts,
outport.TopicFinalizedBlock: i.finalizedBlock,
outport.TopicSettings: i.setSettings,
}
}

Expand Down Expand Up @@ -127,6 +128,16 @@ func (i *indexer) finalizedBlock(_ []byte) error {
return nil
}

func (i *indexer) setSettings(marshalledData []byte) error {
settings := outport.OutportConfig{}
err := i.marshaller.Unmarshal(&settings, marshalledData)
if err != nil {
return err
}

return i.di.SetCurrentSettings(settings)
}

// Close will close the indexer
func (i *indexer) Close() error {
return i.di.Close()
Expand Down
2 changes: 2 additions & 0 deletions process/wsindexer/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (

// WSClient defines what a websocket client should do
type WSClient interface {
Send(message []byte, topic string) error
Close() error
}

Expand All @@ -18,6 +19,7 @@ type DataIndexer interface {
SaveValidatorsRating(ratingData *outport.ValidatorsRating) error
SaveAccounts(accountsData *outport.Accounts) error
FinalizedBlock(finalizedBlock *outport.FinalizedBlock) error
SetCurrentSettings(settings outport.OutportConfig) error
Close() error
IsInterfaceNil() bool
}
4 changes: 3 additions & 1 deletion scripts/observers/.env
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ MX_CHAIN_DEPLOY_GO_URL=https://github.com/multiversx/mx-chain-deploy-go
MX_CHAIN_PROXY_URL=https://github.com/multiversx/mx-chain-proxy-go

NODE_GO_URL="https://github.com/multiversx/mx-chain-go"
NODE_GO_BRANCH="rc/v1.6.0"
NODE_GO_BRANCH="handle-settings-in-outport-driver"

WORKING_DIRECTORY="IndexerObservers"
OBSERVER_DIR_PREFIX="observer_shard_"
Expand All @@ -16,4 +16,6 @@ OBSERVERS_START_PORT=9500

PROXY_PORT=7950

ACK_TIMEOUT_IN_SECONDS=10

INDEXER_BINARY_SERVER=true
5 changes: 5 additions & 0 deletions scripts/observers/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ help:
@echo " start - Start the local testnet"
@echo " stop - Stop the local testnet"
@echo " clean - Clean up the files of the local testnet"
@echo " cluster - Start an Elasticsearch cluster on port 9200"

config:
python3 config.py
Expand All @@ -14,3 +15,7 @@ stop:
python3 stop.py
clean:
python3 clean.py
cd .. && /bin/bash script.sh delete
cluster:
cd .. && /bin/bash script.sh start

22 changes: 12 additions & 10 deletions scripts/observers/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,11 @@ def update_toml_indexer(path, shard_id):
prefs_data['config']['web-socket']['mode'] = "server"

if shard_id != METACHAIN:
prefs_data['config']['web-socket']['url'] = "localhost:" + str(port)
prefs_data['config']['web-socket']['url'] = f"localhost:{str(port)}"
else:
prefs_data['config']['web-socket']['url'] = "localhost:" + str(meta_port)
prefs_data['config']['web-socket']['url'] = f"localhost:{str(meta_port)}"
prefs_data['config']['web-socket']['data-marshaller-type'] = str(os.getenv('WS_MARSHALLER_TYPE'))
prefs_data['config']['web-socket']['acknowledge-timeout-in-seconds'] = int(os.getenv('ACK_TIMEOUT_IN_SECONDS'))

f = open(path_prefs, 'w')
toml.dump(prefs_data, f)
Expand Down Expand Up @@ -61,16 +62,17 @@ def update_toml_node(path, shard_id):

is_indexer_server = os.getenv('INDEXER_BINARY_SERVER')
if is_indexer_server:
external_data['HostDriverConfig']['IsServer'] = False
external_data['HostDriverConfig']['Mode'] = "client"
port = WS_PORT_BASE
meta_port = WS_PORT_BASE

if shard_id != METACHAIN:
external_data['HostDriverConfig']['URL'] = "localhost:" + str(port)
external_data['HostDriverConfig']['URL'] = f"localhost:{str(port)}"
else:
external_data['HostDriverConfig']['URL'] = "localhost:" + str(meta_port)
external_data['HostDriverConfig']['URL'] = f"localhost:{str(meta_port)}"

external_data['HostDriverConfig']['MarshallerType'] = str(os.getenv('WS_MARSHALLER_TYPE'))
external_data['HostDriverConfig']['AcknowledgeTimeoutInSec'] = int(os.getenv('ACK_TIMEOUT_IN_SECONDS'))
f = open(path_external, 'w')
toml.dump(external_data, f)
f.close()
Expand Down Expand Up @@ -197,10 +199,10 @@ def prepare_proxy(working_dir):
config_data['Observers'].append(meta_observer)

num_of_shards = int(os.getenv('NUM_OF_SHARDS'))
for shardID in range(num_of_shards):
shard_observer_port = observers_start_port + shardID + 1
for shard_id in range(num_of_shards):
shard_observer_port = observers_start_port + shard_id + 1
meta_observer = {
'ShardId': shardID,
'ShardId': shard_id,
'Address': f'http://127.0.0.1:{shard_observer_port}',
}
config_data['Observers'].append(meta_observer)
Expand Down Expand Up @@ -262,8 +264,8 @@ def main():
prepare_observer(METACHAIN, working_dir, config_folder)
prepare_indexer_server(METACHAIN, working_dir)

for shardID in range(num_of_shards):
prepare_observer(shardID, working_dir, config_folder)
for shard_id in range(num_of_shards):
prepare_observer(shard_id, working_dir, config_folder)


if __name__ == "__main__":
Expand Down
2 changes: 1 addition & 1 deletion scripts/observers/stop.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ def main():
os.system(f'screen -X -S indexer{shard_id} quit')

if is_indexer_server:
os.system(f'screen -X -S indexerserver quit')
os.system("screen -X -S indexerserver quit")

print("done")

Expand Down

0 comments on commit dd828c1

Please sign in to comment.