From 8051ea9a8cdbade643419d5dda54bd2e7a50b9d2 Mon Sep 17 00:00:00 2001 From: ssd04 Date: Fri, 15 Dec 2023 17:44:55 +0200 Subject: [PATCH 01/14] use old txpool structure for v0 --- data/block.go | 18 +++- process/preprocess/eventsPreProcessorV0.go | 84 +++++++++++++----- .../preprocess/eventsPreProcessorV0_test.go | 87 ++++--------------- 3 files changed, 99 insertions(+), 90 deletions(-) diff --git a/data/block.go b/data/block.go index 3861278..db24b67 100644 --- a/data/block.go +++ b/data/block.go @@ -4,6 +4,7 @@ import ( "github.com/multiversx/mx-chain-core-go/core" nodeData "github.com/multiversx/mx-chain-core-go/data" "github.com/multiversx/mx-chain-core-go/data/alteredAccount" + "github.com/multiversx/mx-chain-core-go/data/block" "github.com/multiversx/mx-chain-core-go/data/outport" "github.com/multiversx/mx-chain-core-go/data/receipt" "github.com/multiversx/mx-chain-core-go/data/rewardTx" @@ -44,10 +45,25 @@ type ArgsSaveBlockData struct { NumberOfShards uint32 } +// OutportBlockDataOld holds the block data that will be received on push events +// TODO: remove on next iterations, new versions will use outport driver structs from +// core repository, which will be backwards compatible from now on +type OutportBlockDataOld struct { + HeaderHash []byte + Body *block.Body + TransactionsPool *TransactionsPool + SignersIndexes []uint64 + NotarizedHeadersHashes []string + HeaderGasConsumption outport.HeaderGasConsumption + AlteredAccounts map[string]*alteredAccount.AlteredAccount + NumberOfShards uint32 + IsImportDB bool +} + // ArgsSaveBlock holds block data with header type type ArgsSaveBlock struct { HeaderType core.HeaderType - ArgsSaveBlockData + OutportBlockDataOld } // LogData holds the data needed for indexing logs and events diff --git a/process/preprocess/eventsPreProcessorV0.go b/process/preprocess/eventsPreProcessorV0.go index fdcea6f..1c76942 100644 --- a/process/preprocess/eventsPreProcessorV0.go +++ b/process/preprocess/eventsPreProcessorV0.go @@ -6,10 +6,10 @@ import ( "github.com/multiversx/mx-chain-core-go/core" coreData "github.com/multiversx/mx-chain-core-go/data" nodeData "github.com/multiversx/mx-chain-core-go/data" - "github.com/multiversx/mx-chain-core-go/data/alteredAccount" "github.com/multiversx/mx-chain-core-go/data/block" "github.com/multiversx/mx-chain-core-go/data/outport" "github.com/multiversx/mx-chain-notifier-go/data" + "github.com/multiversx/mx-chain-notifier-go/process" ) // TODO: dismiss this implementation after http integration is fully deprecated @@ -31,18 +31,8 @@ func NewEventsPreProcessorV0(args ArgsEventsPreProcessor) (*eventsPreProcessorV0 // SaveBlock will handle the block info data func (d *eventsPreProcessorV0) SaveBlock(marshalledData []byte) error { - argsBlockS := struct { - HeaderHash []byte - Body *block.Body - TransactionsPool *outport.TransactionPool - SignersIndexes []uint64 - NotarizedHeadersHashes []string - HeaderGasConsumption outport.HeaderGasConsumption - AlteredAccounts map[string]*alteredAccount.AlteredAccount - NumberOfShards uint32 - IsImportDB bool - }{} - err := json.Unmarshal(marshalledData, &argsBlockS) + blockData := &data.OutportBlockDataOld{} + err := json.Unmarshal(marshalledData, &blockData) if err != nil { return err } @@ -52,15 +42,20 @@ func (d *eventsPreProcessorV0) SaveBlock(marshalledData []byte) error { return err } + txsPool, err := d.setTransactionsPool(blockData.TransactionsPool) + if err != nil { + return err + } + saveBlockData := &data.ArgsSaveBlockData{ - HeaderHash: argsBlockS.HeaderHash, - Body: argsBlockS.Body, - SignersIndexes: argsBlockS.SignersIndexes, - NotarizedHeadersHashes: argsBlockS.NotarizedHeadersHashes, - HeaderGasConsumption: &argsBlockS.HeaderGasConsumption, - AlteredAccounts: argsBlockS.AlteredAccounts, - NumberOfShards: argsBlockS.NumberOfShards, - TransactionsPool: argsBlockS.TransactionsPool, + HeaderHash: blockData.HeaderHash, + Body: blockData.Body, + SignersIndexes: blockData.SignersIndexes, + NotarizedHeadersHashes: blockData.NotarizedHeadersHashes, + HeaderGasConsumption: &blockData.HeaderGasConsumption, + AlteredAccounts: blockData.AlteredAccounts, + NumberOfShards: blockData.NumberOfShards, + TransactionsPool: txsPool, Header: header, } @@ -72,6 +67,53 @@ func (d *eventsPreProcessorV0) SaveBlock(marshalledData []byte) error { return nil } +func (d *eventsPreProcessorV0) setTransactionsPool(txsPool *data.TransactionsPool) (*outport.TransactionPool, error) { + if txsPool == nil { + return nil, process.ErrNilTransactionsPool + } + + txs := make(map[string]*outport.TxInfo) + + if txsPool.Txs != nil { + for hash, txHandler := range txsPool.Txs { + txs[hash] = &outport.TxInfo{ + Transaction: txHandler.TransactionHandler, + FeeInfo: &txHandler.FeeInfo, + ExecutionOrder: uint32(txHandler.ExecutionOrder), + } + } + } + + scrs := make(map[string]*outport.SCRInfo) + + if txsPool.Scrs != nil { + for hash, scrHandler := range txsPool.Scrs { + scrs[hash] = &outport.SCRInfo{ + SmartContractResult: scrHandler.TransactionHandler, + FeeInfo: &scrHandler.FeeInfo, + ExecutionOrder: uint32(scrHandler.ExecutionOrder), + } + } + } + + logs := make([]*outport.LogData, 0) + + if txsPool.Logs != nil { + for _, logHandler := range txsPool.Logs { + logs = append(logs, &outport.LogData{ + TxHash: logHandler.TxHash, + Log: logHandler.LogHandler, + }) + } + } + + return &outport.TransactionPool{ + Transactions: txs, + SmartContractResults: scrs, + Logs: logs, + }, nil +} + func (d *eventsPreProcessorV0) getHeader(marshaledData []byte) (nodeData.HeaderHandler, error) { headerTypeStruct := struct { HeaderType core.HeaderType diff --git a/process/preprocess/eventsPreProcessorV0_test.go b/process/preprocess/eventsPreProcessorV0_test.go index 7b32b6a..8f1f464 100644 --- a/process/preprocess/eventsPreProcessorV0_test.go +++ b/process/preprocess/eventsPreProcessorV0_test.go @@ -6,14 +6,11 @@ import ( "testing" coreData "github.com/multiversx/mx-chain-core-go/data" - "github.com/multiversx/mx-chain-core-go/data/block" - "github.com/multiversx/mx-chain-core-go/data/outport" - "github.com/multiversx/mx-chain-core-go/data/smartContractResult" - "github.com/multiversx/mx-chain-core-go/data/transaction" "github.com/multiversx/mx-chain-notifier-go/data" notifierData "github.com/multiversx/mx-chain-notifier-go/data" "github.com/multiversx/mx-chain-notifier-go/mocks" "github.com/multiversx/mx-chain-notifier-go/process/preprocess" + "github.com/multiversx/mx-chain-notifier-go/testdata" "github.com/stretchr/testify/require" ) @@ -23,7 +20,7 @@ func TestPreProcessorV0_SaveBlock(t *testing.T) { t.Run("nil block data", func(t *testing.T) { t.Parallel() - outportBlock := createDefaultOutportBlockV0() + outportBlock := testdata.OutportBlockV0() outportBlock.HeaderType = "invalid" marshalledBlock, _ := json.Marshal(outportBlock) @@ -46,7 +43,7 @@ func TestPreProcessorV0_SaveBlock(t *testing.T) { }, } - outportBlock := createDefaultOutportBlockV0() + outportBlock := testdata.OutportBlockV0() dp, err := preprocess.NewEventsPreProcessorV0(args) require.Nil(t, err) @@ -59,14 +56,27 @@ func TestPreProcessorV0_SaveBlock(t *testing.T) { t.Run("should work", func(t *testing.T) { t.Parallel() - dp, err := preprocess.NewEventsPreProcessorV0(createMockEventsDataPreProcessorArgs()) + args := createMockEventsDataPreProcessorArgs() + + wasCalled := false + args.Facade = &mocks.FacadeStub{ + HandlePushEventsV2Called: func(events data.ArgsSaveBlockData) error { + wasCalled = true + return nil + }, + } + + dp, err := preprocess.NewEventsPreProcessorV0(args) require.Nil(t, err) - outportBlock := createDefaultOutportBlockV0() + outportBlock := testdata.OutportBlockV0() - marshalledBlock, _ := json.Marshal(outportBlock) + marshalledBlock, err := json.Marshal(outportBlock) + require.Nil(t, err) err = dp.SaveBlock(marshalledBlock) require.Nil(t, err) + + require.True(t, wasCalled) }) } @@ -102,62 +112,3 @@ func TestPreProcessorV0_FinalizedBlock(t *testing.T) { err = dp.FinalizedBlock(marshalledBlock) require.Nil(t, err) } - -func createDefaultOutportBlockV0() *notifierData.ArgsSaveBlock { - outportBlock := ¬ifierData.ArgsSaveBlock{ - HeaderType: "Header", - ArgsSaveBlockData: notifierData.ArgsSaveBlockData{ - HeaderHash: []byte("headerHash3"), - Body: &block.Body{ - MiniBlocks: []*block.MiniBlock{ - { - TxHashes: [][]byte{}, - ReceiverShardID: 1, - SenderShardID: 1, - }, - }, - }, - TransactionsPool: &outport.TransactionPool{ - Transactions: map[string]*outport.TxInfo{ - "txHash1": { - Transaction: &transaction.Transaction{ - Nonce: 1, - GasPrice: 1, - GasLimit: 1, - }, - FeeInfo: &outport.FeeInfo{ - GasUsed: 1, - }, - ExecutionOrder: 2, - }, - }, - SmartContractResults: map[string]*outport.SCRInfo{ - "scrHash1": { - SmartContractResult: &smartContractResult.SmartContractResult{ - Nonce: 2, - GasLimit: 2, - GasPrice: 2, - CallType: 2, - }, - FeeInfo: &outport.FeeInfo{ - GasUsed: 2, - }, - ExecutionOrder: 0, - }, - }, - Logs: []*outport.LogData{ - { - Log: &transaction.Log{ - Address: []byte("logaddr1"), - Events: []*transaction.Event{}, - }, - TxHash: "logHash1", - }, - }, - }, - NumberOfShards: 2, - }, - } - - return outportBlock -} From 0d17817b8cf36cff2b7c8e29ff43ffd18f3bdd80 Mon Sep 17 00:00:00 2001 From: ssd04 Date: Fri, 15 Dec 2023 17:45:17 +0200 Subject: [PATCH 02/14] move test data structs to separate package --- {tools => testdata}/testData.go | 91 +++++++++++++++++---------------- 1 file changed, 47 insertions(+), 44 deletions(-) rename {tools => testdata}/testData.go (73%) diff --git a/tools/testData.go b/testdata/testData.go similarity index 73% rename from tools/testData.go rename to testdata/testData.go index 3bcb6b1..aaf3832 100644 --- a/tools/testData.go +++ b/testdata/testData.go @@ -1,4 +1,4 @@ -package tools +package testdata import ( "encoding/json" @@ -7,6 +7,7 @@ import ( "github.com/multiversx/mx-chain-core-go/data/outport" "github.com/multiversx/mx-chain-core-go/data/smartContractResult" "github.com/multiversx/mx-chain-core-go/data/transaction" + "github.com/multiversx/mx-chain-notifier-go/data" notifierData "github.com/multiversx/mx-chain-notifier-go/data" ) @@ -34,59 +35,61 @@ func OldSaveBlockData() *notifierData.SaveBlockData { // OutportBlockV0 - func OutportBlockV0() *notifierData.ArgsSaveBlock { - return ¬ifierData.ArgsSaveBlock{ - HeaderType: "Header", - ArgsSaveBlockData: notifierData.ArgsSaveBlockData{ - HeaderHash: []byte("headerHash3"), - Body: &block.Body{ - MiniBlocks: []*block.MiniBlock{ - { - TxHashes: [][]byte{}, - ReceiverShardID: 1, - SenderShardID: 1, - }, + saveBlockData := data.OutportBlockDataOld{ + HeaderHash: []byte("headerHash3"), + Body: &block.Body{ + MiniBlocks: []*block.MiniBlock{ + { + TxHashes: [][]byte{}, + ReceiverShardID: 1, + SenderShardID: 1, }, }, - TransactionsPool: &outport.TransactionPool{ - Transactions: map[string]*outport.TxInfo{ - "txHash1": { - Transaction: &transaction.Transaction{ - Nonce: 1, - GasPrice: 1, - GasLimit: 1, - }, - FeeInfo: &outport.FeeInfo{ - GasUsed: 1, - }, - ExecutionOrder: 2, + }, + TransactionsPool: ¬ifierData.TransactionsPool{ + Txs: map[string]*notifierData.NodeTransaction{ + "txHash1": { + TransactionHandler: &transaction.Transaction{ + Nonce: 1, + GasPrice: 1, + GasLimit: 1, + }, + FeeInfo: outport.FeeInfo{ + GasUsed: 1, }, + ExecutionOrder: 2, }, - SmartContractResults: map[string]*outport.SCRInfo{ - "scrHash1": { - SmartContractResult: &smartContractResult.SmartContractResult{ - Nonce: 2, - GasLimit: 2, - GasPrice: 2, - CallType: 2, - }, - FeeInfo: &outport.FeeInfo{ - GasUsed: 2, - }, - ExecutionOrder: 0, + }, + Scrs: map[string]*notifierData.NodeSmartContractResult{ + "scrHash1": { + TransactionHandler: &smartContractResult.SmartContractResult{ + Nonce: 2, + GasLimit: 2, + GasPrice: 2, + CallType: 2, }, + FeeInfo: outport.FeeInfo{ + GasUsed: 2, + }, + ExecutionOrder: 0, }, - Logs: []*outport.LogData{ - { - Log: &transaction.Log{ - Address: []byte("logaddr1"), - Events: []*transaction.Event{}, - }, - TxHash: "logHash1", + }, + Logs: []*notifierData.LogData{ + { + LogHandler: &transaction.Log{ + Address: []byte("logaddr1"), + Events: []*transaction.Event{}, }, + TxHash: "logHash1", }, }, - NumberOfShards: 2, }, + NumberOfShards: 2, + } + + return &data.ArgsSaveBlock{ + HeaderType: "Header", + OutportBlockDataOld: saveBlockData, } } From 017f72631a370480b3d1a7a029be9ae2ab6b4108 Mon Sep 17 00:00:00 2001 From: ssd04 Date: Fri, 15 Dec 2023 17:48:36 +0200 Subject: [PATCH 03/14] update testing tools --- tools/httpConnector/httpClientWrapper.go | 2 +- tools/httpConnector/main.go | 8 ++++---- tools/wsConnector/main.go | 8 ++++---- 3 files changed, 9 insertions(+), 9 deletions(-) diff --git a/tools/httpConnector/httpClientWrapper.go b/tools/httpConnector/httpClientWrapper.go index 13dcf5a..9471a41 100644 --- a/tools/httpConnector/httpClientWrapper.go +++ b/tools/httpConnector/httpClientWrapper.go @@ -24,7 +24,7 @@ const ( contentTypeKey = "Content-Type" contentTypeValue = "application/json" payloadVersionKey = "version" - payloadVersionValue = "1" + payloadVersionValue = "0" ) type httpClientWrapper struct { diff --git a/tools/httpConnector/main.go b/tools/httpConnector/main.go index c2f70b6..0821814 100644 --- a/tools/httpConnector/main.go +++ b/tools/httpConnector/main.go @@ -3,7 +3,7 @@ package main import ( "fmt" - "github.com/multiversx/mx-chain-notifier-go/tools" + "github.com/multiversx/mx-chain-notifier-go/testdata" ) func main() { @@ -18,19 +18,19 @@ func main() { return } - err = httpClient.Post("/events/push", tools.OutportBlockV0()) + err = httpClient.Post("/events/push", testdata.OutportBlockV0()) if err != nil { fmt.Println(fmt.Errorf("%w in eventNotifier.SaveBlock while posting block data", err)) return } - err = httpClient.Post("/events/revert", tools.RevertBlockV0()) + err = httpClient.Post("/events/revert", testdata.RevertBlockV0()) if err != nil { fmt.Println(fmt.Errorf("%w in eventNotifier.SaveBlock while posting block data", err)) return } - err = httpClient.Post("/events/finalized", tools.FinalizedBlockV0()) + err = httpClient.Post("/events/finalized", testdata.FinalizedBlockV0()) if err != nil { fmt.Println(fmt.Errorf("%w in eventNotifier.SaveBlock while posting block data", err)) return diff --git a/tools/wsConnector/main.go b/tools/wsConnector/main.go index ef053b1..c878858 100644 --- a/tools/wsConnector/main.go +++ b/tools/wsConnector/main.go @@ -8,7 +8,7 @@ import ( "github.com/multiversx/mx-chain-core-go/data/outport" "github.com/multiversx/mx-chain-core-go/marshal" logger "github.com/multiversx/mx-chain-logger-go" - "github.com/multiversx/mx-chain-notifier-go/tools" + "github.com/multiversx/mx-chain-notifier-go/testdata" ) func main() { @@ -19,19 +19,19 @@ func main() { return } - err = wsClient.PushEventsRequest(tools.OutportBlockV1()) + err = wsClient.PushEventsRequest(testdata.OutportBlockV1()) if err != nil { fmt.Println(err.Error()) return } - err = wsClient.RevertEventsRequest(tools.RevertBlockV1()) + err = wsClient.RevertEventsRequest(testdata.RevertBlockV1()) if err != nil { fmt.Println(err.Error()) return } - err = wsClient.FinalizedEventsRequest(tools.FinalizedBlockV1()) + err = wsClient.FinalizedEventsRequest(testdata.FinalizedBlockV1()) if err != nil { fmt.Println(err.Error()) return From 77fb4909bf463c6e5b4faf3d466ce9bcfaf5b9e7 Mon Sep 17 00:00:00 2001 From: ssd04 Date: Fri, 15 Dec 2023 19:36:27 +0200 Subject: [PATCH 04/14] fixes after review: refactorings --- process/preprocess/eventsPreProcessorV0.go | 85 +++++++++++++++------- 1 file changed, 59 insertions(+), 26 deletions(-) diff --git a/process/preprocess/eventsPreProcessorV0.go b/process/preprocess/eventsPreProcessorV0.go index 1c76942..c819b54 100644 --- a/process/preprocess/eventsPreProcessorV0.go +++ b/process/preprocess/eventsPreProcessorV0.go @@ -32,7 +32,7 @@ func NewEventsPreProcessorV0(args ArgsEventsPreProcessor) (*eventsPreProcessorV0 // SaveBlock will handle the block info data func (d *eventsPreProcessorV0) SaveBlock(marshalledData []byte) error { blockData := &data.OutportBlockDataOld{} - err := json.Unmarshal(marshalledData, &blockData) + err := json.Unmarshal(marshalledData, blockData) if err != nil { return err } @@ -42,7 +42,7 @@ func (d *eventsPreProcessorV0) SaveBlock(marshalledData []byte) error { return err } - txsPool, err := d.setTransactionsPool(blockData.TransactionsPool) + txsPool, err := d.parseTransactionsPool(blockData.TransactionsPool) if err != nil { return err } @@ -67,44 +67,24 @@ func (d *eventsPreProcessorV0) SaveBlock(marshalledData []byte) error { return nil } -func (d *eventsPreProcessorV0) setTransactionsPool(txsPool *data.TransactionsPool) (*outport.TransactionPool, error) { +func (d *eventsPreProcessorV0) parseTransactionsPool(txsPool *data.TransactionsPool) (*outport.TransactionPool, error) { if txsPool == nil { return nil, process.ErrNilTransactionsPool } txs := make(map[string]*outport.TxInfo) - if txsPool.Txs != nil { - for hash, txHandler := range txsPool.Txs { - txs[hash] = &outport.TxInfo{ - Transaction: txHandler.TransactionHandler, - FeeInfo: &txHandler.FeeInfo, - ExecutionOrder: uint32(txHandler.ExecutionOrder), - } - } + txs = d.parseTxs(txsPool.Txs) } scrs := make(map[string]*outport.SCRInfo) - if txsPool.Scrs != nil { - for hash, scrHandler := range txsPool.Scrs { - scrs[hash] = &outport.SCRInfo{ - SmartContractResult: scrHandler.TransactionHandler, - FeeInfo: &scrHandler.FeeInfo, - ExecutionOrder: uint32(scrHandler.ExecutionOrder), - } - } + scrs = d.parseScrs(txsPool.Scrs) } logs := make([]*outport.LogData, 0) - if txsPool.Logs != nil { - for _, logHandler := range txsPool.Logs { - logs = append(logs, &outport.LogData{ - TxHash: logHandler.TxHash, - Log: logHandler.LogHandler, - }) - } + logs = d.parseLogs(txsPool.Logs) } return &outport.TransactionPool{ @@ -114,6 +94,59 @@ func (d *eventsPreProcessorV0) setTransactionsPool(txsPool *data.TransactionsPoo }, nil } +func (d *eventsPreProcessorV0) parseTxs(txs map[string]*data.NodeTransaction) map[string]*outport.TxInfo { + newTxs := make(map[string]*outport.TxInfo, len(txs)) + + for hash, txHandler := range txs { + if txHandler == nil { + continue + } + + newTxs[hash] = &outport.TxInfo{ + Transaction: txHandler.TransactionHandler, + FeeInfo: &txHandler.FeeInfo, + ExecutionOrder: uint32(txHandler.ExecutionOrder), + } + } + + return newTxs +} + +func (d *eventsPreProcessorV0) parseScrs(scrs map[string]*data.NodeSmartContractResult) map[string]*outport.SCRInfo { + newScrs := make(map[string]*outport.SCRInfo, len(scrs)) + + for hash, scrHandler := range scrs { + if scrHandler == nil { + continue + } + + newScrs[hash] = &outport.SCRInfo{ + SmartContractResult: scrHandler.TransactionHandler, + FeeInfo: &scrHandler.FeeInfo, + ExecutionOrder: uint32(scrHandler.ExecutionOrder), + } + } + + return newScrs +} + +func (d *eventsPreProcessorV0) parseLogs(logs []*data.LogData) []*outport.LogData { + newLogs := make([]*outport.LogData, len(logs)) + + for _, logHandler := range logs { + if logHandler == nil { + continue + } + + newLogs = append(newLogs, &outport.LogData{ + TxHash: logHandler.TxHash, + Log: logHandler.LogHandler, + }) + } + + return newLogs +} + func (d *eventsPreProcessorV0) getHeader(marshaledData []byte) (nodeData.HeaderHandler, error) { headerTypeStruct := struct { HeaderType core.HeaderType From 0d71a6c820adac4ef0ef2235ca1d5e947368f879 Mon Sep 17 00:00:00 2001 From: ssd04 Date: Mon, 18 Dec 2023 11:10:31 +0200 Subject: [PATCH 05/14] fixes after review: refactorings --- process/preprocess/eventsPreProcessorV0.go | 28 ++++------------------ 1 file changed, 5 insertions(+), 23 deletions(-) diff --git a/process/preprocess/eventsPreProcessorV0.go b/process/preprocess/eventsPreProcessorV0.go index c819b54..446d806 100644 --- a/process/preprocess/eventsPreProcessorV0.go +++ b/process/preprocess/eventsPreProcessorV0.go @@ -72,31 +72,15 @@ func (d *eventsPreProcessorV0) parseTransactionsPool(txsPool *data.TransactionsP return nil, process.ErrNilTransactionsPool } - txs := make(map[string]*outport.TxInfo) - if txsPool.Txs != nil { - txs = d.parseTxs(txsPool.Txs) - } - - scrs := make(map[string]*outport.SCRInfo) - if txsPool.Scrs != nil { - scrs = d.parseScrs(txsPool.Scrs) - } - - logs := make([]*outport.LogData, 0) - if txsPool.Logs != nil { - logs = d.parseLogs(txsPool.Logs) - } - return &outport.TransactionPool{ - Transactions: txs, - SmartContractResults: scrs, - Logs: logs, + Transactions: d.parseTxs(txsPool.Txs), + SmartContractResults: d.parseScrs(txsPool.Scrs), + Logs: d.parseLogs(txsPool.Logs), }, nil } func (d *eventsPreProcessorV0) parseTxs(txs map[string]*data.NodeTransaction) map[string]*outport.TxInfo { - newTxs := make(map[string]*outport.TxInfo, len(txs)) - + newTxs := make(map[string]*outport.TxInfo) for hash, txHandler := range txs { if txHandler == nil { continue @@ -113,8 +97,7 @@ func (d *eventsPreProcessorV0) parseTxs(txs map[string]*data.NodeTransaction) ma } func (d *eventsPreProcessorV0) parseScrs(scrs map[string]*data.NodeSmartContractResult) map[string]*outport.SCRInfo { - newScrs := make(map[string]*outport.SCRInfo, len(scrs)) - + newScrs := make(map[string]*outport.SCRInfo) for hash, scrHandler := range scrs { if scrHandler == nil { continue @@ -132,7 +115,6 @@ func (d *eventsPreProcessorV0) parseScrs(scrs map[string]*data.NodeSmartContract func (d *eventsPreProcessorV0) parseLogs(logs []*data.LogData) []*outport.LogData { newLogs := make([]*outport.LogData, len(logs)) - for _, logHandler := range logs { if logHandler == nil { continue From f6e6b5b97f237fe687a0f390969f83dbdf26db3a Mon Sep 17 00:00:00 2001 From: ssd04 Date: Thu, 21 Dec 2023 16:14:20 +0200 Subject: [PATCH 06/14] fix field in config --- cmd/notifier/config/config.toml | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/cmd/notifier/config/config.toml b/cmd/notifier/config/config.toml index 7bf514d..f29933c 100644 --- a/cmd/notifier/config/config.toml +++ b/cmd/notifier/config/config.toml @@ -1,4 +1,8 @@ [General] + # CheckDuplicates signals if the events received from observers have been already pushed to clients + # Requires a redis instance/cluster and should be used when multiple observers push from the same shard + CheckDuplicates = true + # ExternalMarshaller is used for handling incoming/outcoming api requests [General.ExternalMarshaller] Type = "json" @@ -13,10 +17,6 @@ Prefix = "erd" Length = 32 - # CheckDuplicates signals if the events received from observers have been already pushed to clients - # Requires a redis instance/cluster and should be used when multiple observers push from the same shard - CheckDuplicates = true - [WebSocketConnector] # Enabled will determine if websocket connector will be enabled or not Enabled = false @@ -44,7 +44,9 @@ AcknowledgeTimeoutInSec = 60 [ConnectorApi] - # Enabled will determine if http connector will be enabled or not + # Enabled will determine if http connector will be enabled or not. + # It will determine if http connector endpoints will be created. + # If set to false, the web server will still be created Enabled = true # The address on which the events notifier listens for subscriptions From 756e5bad309ce50de5ea6136d714d32da5f77e00 Mon Sep 17 00:00:00 2001 From: ssd04 Date: Thu, 21 Dec 2023 16:18:21 +0200 Subject: [PATCH 07/14] update comment --- cmd/notifier/config/config.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cmd/notifier/config/config.toml b/cmd/notifier/config/config.toml index f29933c..2bae139 100644 --- a/cmd/notifier/config/config.toml +++ b/cmd/notifier/config/config.toml @@ -46,7 +46,7 @@ [ConnectorApi] # Enabled will determine if http connector will be enabled or not. # It will determine if http connector endpoints will be created. - # If set to false, the web server will still be created + # If set to false, the web server will still be created for other endpoints (for metrics, or for WS is needed) Enabled = true # The address on which the events notifier listens for subscriptions From a03f032464e6c313d5d70ffb0ea3ac7157bb43f6 Mon Sep 17 00:00:00 2001 From: ssd04 Date: Thu, 21 Dec 2023 16:19:06 +0200 Subject: [PATCH 08/14] update comment --- cmd/notifier/config/config.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cmd/notifier/config/config.toml b/cmd/notifier/config/config.toml index 2bae139..237233a 100644 --- a/cmd/notifier/config/config.toml +++ b/cmd/notifier/config/config.toml @@ -46,7 +46,7 @@ [ConnectorApi] # Enabled will determine if http connector will be enabled or not. # It will determine if http connector endpoints will be created. - # If set to false, the web server will still be created for other endpoints (for metrics, or for WS is needed) + # If set to false, the web server will still be created for other endpoints (for metrics, or for WS if needed) Enabled = true # The address on which the events notifier listens for subscriptions From fc2096024c5a2df7a9a6a2b9d82717a0a4a3e841 Mon Sep 17 00:00:00 2001 From: ssd04 Date: Fri, 22 Dec 2023 11:20:37 +0200 Subject: [PATCH 09/14] toml config unit tests --- cmd/notifier/config/config.toml | 3 + config/tomlConfig_test.go | 273 ++++++++++++++++++++++++++++++++ factory/wsFactory.go | 13 +- 3 files changed, 283 insertions(+), 6 deletions(-) create mode 100644 config/tomlConfig_test.go diff --git a/cmd/notifier/config/config.toml b/cmd/notifier/config/config.toml index 237233a..24ad5c0 100644 --- a/cmd/notifier/config/config.toml +++ b/cmd/notifier/config/config.toml @@ -37,6 +37,9 @@ # Signals if in case of data payload processing error, we should send the ack signal or not BlockingAckOnError = false + # Set to true to drop messages if there is no active WebSocket connection to send to. + DropMessagesIfNoConnection = false + # After a message will be sent it will wait for an ack message if this flag is enabled WithAcknowledge = true diff --git a/config/tomlConfig_test.go b/config/tomlConfig_test.go new file mode 100644 index 0000000..ddf0dc7 --- /dev/null +++ b/config/tomlConfig_test.go @@ -0,0 +1,273 @@ +package config_test + +import ( + "strconv" + "testing" + + "github.com/multiversx/mx-chain-notifier-go/config" + "github.com/pelletier/go-toml" + "github.com/stretchr/testify/require" +) + +func TestMainConfig(t *testing.T) { + t.Parallel() + + generalMarshallerType := "json" + adrConverterType := "bech32" + adrConverterPrefix := "erd" + adrConverterLength := 32 + checkDuplicates := true + + connectorAPIHost := "5000" + connectorAPIUsername := "guest" + connectorAPIPassword := "guest" + + redisURL := "redis://localhost:6379/0" + redisMasterName := "mymaster" + redisSentinelURL := "localhost:26379" + redisConnectionType := "sentinel" + redisTTL := 30 + + rabbitmqURL := "amqp://guest:guest@localhost:5672" + rabbitmqEventsExchangeName := "all_events" + rabbitmqEventsExchangeType := "fanout" + rabbitmqRevertExchangeName := "revert_events" + rabbitmqRevertExchangeType := "fanout" + + wsConnURL := "localhost:22111" + wsConnMode := "server" + wsConnRetryDurationInSec := 5 + wsConnAckTimeoutInSec := 60 + wsConnMarshallerType := "gogo protobuf" + + expectedConfig := config.MainConfig{ + General: config.GeneralConfig{ + ExternalMarshaller: config.MarshallerConfig{ + Type: generalMarshallerType, + }, + AddressConverter: config.AddressConverterConfig{ + Type: adrConverterType, + Prefix: adrConverterPrefix, + Length: adrConverterLength, + }, + CheckDuplicates: checkDuplicates, + }, + WebSocketConnector: config.WebSocketConfig{ + Enabled: true, + URL: wsConnURL, + Mode: wsConnMode, + RetryDurationInSec: wsConnRetryDurationInSec, + AcknowledgeTimeoutInSec: wsConnAckTimeoutInSec, + WithAcknowledge: true, + BlockingAckOnError: false, + DropMessagesIfNoConnection: false, + DataMarshallerType: wsConnMarshallerType, + }, + ConnectorApi: config.ConnectorApiConfig{ + Enabled: true, + Host: connectorAPIHost, + Username: connectorAPIUsername, + Password: connectorAPIPassword, + }, + Redis: config.RedisConfig{ + Url: redisURL, + MasterName: redisMasterName, + SentinelUrl: redisSentinelURL, + ConnectionType: redisConnectionType, + TTL: uint32(redisTTL), + }, + RabbitMQ: config.RabbitMQConfig{ + Url: rabbitmqURL, + EventsExchange: config.RabbitMQExchangeConfig{ + Name: rabbitmqEventsExchangeName, + Type: rabbitmqEventsExchangeType, + }, + RevertEventsExchange: config.RabbitMQExchangeConfig{ + Name: rabbitmqRevertExchangeName, + Type: rabbitmqRevertExchangeType, + }, + }, + } + + testString := ` +[General] + # CheckDuplicates signals if the events received from observers have been already pushed to clients + # Requires a redis instance/cluster and should be used when multiple observers push from the same shard + CheckDuplicates = true + + # ExternalMarshaller is used for handling incoming/outcoming api requests + [General.ExternalMarshaller] + Type = "` + generalMarshallerType + `" + # InternalMarshaller is used for handling internal structs + # This has to be mapped with the internal marshalling used for notifier outport driver + [General.InternalMarshaller] + Type = "` + generalMarshallerType + `" + + # Address pubkey converter config options + [General.AddressConverter] + Type = "` + adrConverterType + `" + Prefix = "` + adrConverterPrefix + `" + Length = ` + strconv.Itoa(adrConverterLength) + ` + +[WebSocketConnector] + # Enabled will determine if websocket connector will be enabled or not + Enabled = true + + # URL for the WebSocket client/server connection + # This value represents the IP address and port number that the WebSocket client or server will use to establish a connection. + URL = "` + wsConnURL + `" + + # This flag describes the mode to start the WebSocket connector. Can be "client" or "server" + Mode = "` + wsConnMode + `" + + # Possible values: json, gogo protobuf. Should be compatible with mx-chain-node outport driver config + DataMarshallerType = "` + wsConnMarshallerType + `" + + # Retry duration (receive/send ack signal) in seconds + RetryDurationInSec = ` + strconv.Itoa(wsConnRetryDurationInSec) + ` + + # Signals if in case of data payload processing error, we should send the ack signal or not + BlockingAckOnError = false + + # Set to true to drop messages if there is no active WebSocket connection to send to. + DropMessagesIfNoConnection = false + + # After a message will be sent it will wait for an ack message if this flag is enabled + WithAcknowledge = true + + # The duration in seconds to wait for an acknowledgment message, after this time passes an error will be returned + AcknowledgeTimeoutInSec = ` + strconv.Itoa(wsConnAckTimeoutInSec) + ` + +[ConnectorApi] + # Enabled will determine if http connector will be enabled or not. + # It will determine if http connector endpoints will be created. + # If set to false, the web server will still be created for other endpoints (for metrics, or for WS if needed) + Enabled = true + + # The address on which the events notifier listens for subscriptions + # It can be specified as "localhost:5000" or only as "5000" + Host = "` + connectorAPIHost + `" + + # Username and Password needed to authorize the connector + # BasicAuth is enabled only for the endpoints with "Auth" flag enabled + # in api.toml config file + Username = "` + connectorAPIUsername + `" + Password = "` + connectorAPIPassword + `" + +[Redis] + # The url used to connect to a pubsub server + Url = "` + redisURL + `" + + # The master name for failover client + MasterName = "` + redisMasterName + `" + + # The sentinel url for failover client + SentinelUrl = "` + redisSentinelURL + `" + + # The redis connection type. Options: | instance | sentinel | + # instance - it will try to connect to a single redis instance + # sentinel - it will try to connect to redis setup with master, slave and sentinel instances + ConnectionType = "` + redisConnectionType + `" + + # Time to live (in minutes) for redis lock entry + TTL = ` + strconv.Itoa(redisTTL) + ` + +[RabbitMQ] + # The url used to connect to a rabbitMQ server + # Note: not required for running in the notifier mode + Url = "` + rabbitmqURL + `" + + # The exchange which holds all logs and events + [RabbitMQ.EventsExchange] + Name = "` + rabbitmqEventsExchangeName + `" + Type = "` + rabbitmqEventsExchangeType + `" + + # The exchange which holds revert events + [RabbitMQ.RevertEventsExchange] + Name = "` + rabbitmqRevertExchangeName + `" + Type = "` + rabbitmqRevertExchangeType + `" + ` + + config := config.MainConfig{} + + err := toml.Unmarshal([]byte(testString), &config) + require.Nil(t, err) + require.Equal(t, expectedConfig, config) +} + +func TestAPIConfig(t *testing.T) { + t.Parallel() + + expectedAPIConfig := config.APIRoutesConfig{ + APIPackages: map[string]config.APIPackageConfig{ + "events": { + Routes: []config.RouteConfig{ + { + Name: "/push", + Open: true, + Auth: false, + }, + { + Name: "/revert", + Open: true, + Auth: false, + }, + { + Name: "/finalized", + Open: true, + Auth: false, + }, + }, + }, + "hub": { + Routes: []config.RouteConfig{ + { + Name: "/ws", + Open: true, + }, + }, + }, + "status": { + Routes: []config.RouteConfig{ + { + Name: "/metrics", + Open: true, + }, + { + Name: "/prometheus-metrics", + Open: true, + }, + }, + }, + }, + } + + testString := ` +# API routes configuration +[APIPackages] + +[APIPackages.events] + Routes = [ + { Name = "/push", Open = true, Auth = false }, + { Name = "/revert", Open = true, Auth = false }, + { Name = "/finalized", Open = true, Auth = false }, + ] + +[APIPackages.hub] + Routes = [ + { Name = "/ws", Open = true }, + ] + +[APIPackages.status] + Routes = [ + { Name = "/metrics", Open = true }, + { Name = "/prometheus-metrics", Open = true }, + ] + ` + + config := config.APIRoutesConfig{} + + err := toml.Unmarshal([]byte(testString), &config) + require.Nil(t, err) + require.Equal(t, expectedAPIConfig, config) +} diff --git a/factory/wsFactory.go b/factory/wsFactory.go index 7d9272c..1a19fd9 100644 --- a/factory/wsFactory.go +++ b/factory/wsFactory.go @@ -86,12 +86,13 @@ func createWsObsConnector( func createWsHost(wsConfig config.WebSocketConfig, wsMarshaller marshal.Marshalizer) (factoryHost.FullDuplexHost, error) { return factoryHost.CreateWebSocketHost(factoryHost.ArgsWebSocketHost{ WebSocketConfig: data.WebSocketConfig{ - URL: wsConfig.URL, - WithAcknowledge: wsConfig.WithAcknowledge, - Mode: wsConfig.Mode, - RetryDurationInSec: int(wsConfig.RetryDurationInSec), - BlockingAckOnError: wsConfig.BlockingAckOnError, - AcknowledgeTimeoutInSec: wsConfig.AcknowledgeTimeoutInSec, + URL: wsConfig.URL, + WithAcknowledge: wsConfig.WithAcknowledge, + Mode: wsConfig.Mode, + RetryDurationInSec: int(wsConfig.RetryDurationInSec), + BlockingAckOnError: wsConfig.BlockingAckOnError, + AcknowledgeTimeoutInSec: wsConfig.AcknowledgeTimeoutInSec, + DropMessagesIfNoConnection: wsConfig.DropMessagesIfNoConnection, }, Marshaller: wsMarshaller, Log: log, From 7be18c980cdc5aed1513601f856a619397fef42e Mon Sep 17 00:00:00 2001 From: ssd04 Date: Tue, 9 Jan 2024 10:42:05 +0200 Subject: [PATCH 10/14] update build with flags --- Makefile | 2 +- README.md | 4 +++- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/Makefile b/Makefile index b7f15be..bc34023 100644 --- a/Makefile +++ b/Makefile @@ -31,7 +31,7 @@ help: build: cd ${cmd_dir} && \ - go build -v -ldflags="-X main.appVersion=$(git describe --tags --long --dirty)" -o ${binary} + go build -v -ldflags="-X main.appVersion=$(shell git describe --tags --long --dirty)" -o ${binary} publisher_type="rabbitmq" run: build diff --git a/README.md b/README.md index fc2e756..c7e34ee 100644 --- a/README.md +++ b/README.md @@ -26,13 +26,15 @@ deprecated in the future. Using the `cmd/notifier` package as root, execute the following commands: - install go dependencies: `go install` -- build executable: `go build -o event-notifier` +- build executable: `go build -ldflags="-X main.appVersion=$(git describe --tags --long --dirty)" -o event-notifier` - run `./event-notifier` --- + This can also be done using a single command from `Makefile`: ```bash # by default, rabbitmq type +# `run` command will also trigger make `build` command make run # specify notifier running mode (eq: rabbitmq, ws) From 10bf77eb235d74684fb76b6d08a748e5d1a0fd0d Mon Sep 17 00:00:00 2001 From: ssd04 Date: Tue, 9 Jan 2024 10:56:32 +0200 Subject: [PATCH 11/14] added build script --- README.md | 5 +++++ scripts/build.sh | 21 +++++++++++++++++++++ 2 files changed, 26 insertions(+) create mode 100644 scripts/build.sh diff --git a/README.md b/README.md index c7e34ee..edb7607 100644 --- a/README.md +++ b/README.md @@ -29,6 +29,11 @@ Using the `cmd/notifier` package as root, execute the following commands: - build executable: `go build -ldflags="-X main.appVersion=$(git describe --tags --long --dirty)" -o event-notifier` - run `./event-notifier` +Or use the build script: +```bash +bash scripts/build.sh +``` + --- This can also be done using a single command from `Makefile`: diff --git a/scripts/build.sh b/scripts/build.sh new file mode 100644 index 0000000..8532e00 --- /dev/null +++ b/scripts/build.sh @@ -0,0 +1,21 @@ +#!/usr/bin/env bash + +# set -x + +cmd_dir="cmd/notifier" +binary=event-notifier + +if ! [ -x "$(command -v go)" ]; then + echo -e "go has to be installed" + exit 1 +fi + +if ! [ -x "$(command -v git)" ]; then + echo -e "git has to be installed" + exit 1 +fi + +cd ${cmd_dir} && \ + go build -v \ + -ldflags="-X main.appVersion=$(git describe --tags --long --dirty)" \ + -o ${binary} From 5758e28b3f9d4a652f6b21790a91a7de81fb6829 Mon Sep 17 00:00:00 2001 From: ssd04 Date: Wed, 31 Jan 2024 17:45:55 +0200 Subject: [PATCH 12/14] remove nginx config file --- tools/nginx/nginx.conf | 3 --- 1 file changed, 3 deletions(-) delete mode 100644 tools/nginx/nginx.conf diff --git a/tools/nginx/nginx.conf b/tools/nginx/nginx.conf deleted file mode 100644 index b1cfac1..0000000 --- a/tools/nginx/nginx.conf +++ /dev/null @@ -1,3 +0,0 @@ -location /some/path/ { - proxy_pass http://www.example.com/link/; -} From d40b68fa8d5a58b61bd39b427766334700fb8f5a Mon Sep 17 00:00:00 2001 From: ssd04 Date: Wed, 31 Jan 2024 17:50:36 +0200 Subject: [PATCH 13/14] revert tests to use latest version --- tools/httpConnector/httpClientWrapper.go | 2 +- tools/httpConnector/main.go | 6 +++--- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/tools/httpConnector/httpClientWrapper.go b/tools/httpConnector/httpClientWrapper.go index 9471a41..13dcf5a 100644 --- a/tools/httpConnector/httpClientWrapper.go +++ b/tools/httpConnector/httpClientWrapper.go @@ -24,7 +24,7 @@ const ( contentTypeKey = "Content-Type" contentTypeValue = "application/json" payloadVersionKey = "version" - payloadVersionValue = "0" + payloadVersionValue = "1" ) type httpClientWrapper struct { diff --git a/tools/httpConnector/main.go b/tools/httpConnector/main.go index f3e4c2d..f1c8d64 100644 --- a/tools/httpConnector/main.go +++ b/tools/httpConnector/main.go @@ -26,19 +26,19 @@ func main() { return } - err = httpClient.Post("/events/push", blockData.OutportBlockV0()) + err = httpClient.Post("/events/push", blockData.OutportBlockV1()) if err != nil { fmt.Println(fmt.Errorf("%w in eventNotifier.SaveBlock while posting block data", err)) return } - err = httpClient.Post("/events/revert", blockData.RevertBlockV0()) + err = httpClient.Post("/events/revert", blockData.RevertBlockV1()) if err != nil { fmt.Println(fmt.Errorf("%w in eventNotifier.SaveBlock while posting block data", err)) return } - err = httpClient.Post("/events/finalized", blockData.FinalizedBlockV0()) + err = httpClient.Post("/events/finalized", blockData.FinalizedBlockV1()) if err != nil { fmt.Println(fmt.Errorf("%w in eventNotifier.SaveBlock while posting block data", err)) return From 8e6c352411d0d8c2882b0d9d9437d77666d21c6b Mon Sep 17 00:00:00 2001 From: ssd04 Date: Wed, 31 Jan 2024 17:54:44 +0200 Subject: [PATCH 14/14] fix test after merge conflicts --- process/preprocess/eventsPreProcessorV0_test.go | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/process/preprocess/eventsPreProcessorV0_test.go b/process/preprocess/eventsPreProcessorV0_test.go index 8f1f464..f026650 100644 --- a/process/preprocess/eventsPreProcessorV0_test.go +++ b/process/preprocess/eventsPreProcessorV0_test.go @@ -6,6 +6,7 @@ import ( "testing" coreData "github.com/multiversx/mx-chain-core-go/data" + "github.com/multiversx/mx-chain-core-go/marshal" "github.com/multiversx/mx-chain-notifier-go/data" notifierData "github.com/multiversx/mx-chain-notifier-go/data" "github.com/multiversx/mx-chain-notifier-go/mocks" @@ -17,10 +18,14 @@ import ( func TestPreProcessorV0_SaveBlock(t *testing.T) { t.Parallel() + marshaller := &marshal.JsonMarshalizer{} + blockData, err := testdata.NewBlockData(marshaller) + require.Nil(t, err) + t.Run("nil block data", func(t *testing.T) { t.Parallel() - outportBlock := testdata.OutportBlockV0() + outportBlock := blockData.OutportBlockV0() outportBlock.HeaderType = "invalid" marshalledBlock, _ := json.Marshal(outportBlock) @@ -43,7 +48,7 @@ func TestPreProcessorV0_SaveBlock(t *testing.T) { }, } - outportBlock := testdata.OutportBlockV0() + outportBlock := blockData.OutportBlockV0() dp, err := preprocess.NewEventsPreProcessorV0(args) require.Nil(t, err) @@ -69,7 +74,7 @@ func TestPreProcessorV0_SaveBlock(t *testing.T) { dp, err := preprocess.NewEventsPreProcessorV0(args) require.Nil(t, err) - outportBlock := testdata.OutportBlockV0() + outportBlock := blockData.OutportBlockV0() marshalledBlock, err := json.Marshal(outportBlock) require.Nil(t, err)