Skip to content

Commit

Permalink
Merge pull request #87 from multiversx/fix-old-data-unmarshall
Browse files Browse the repository at this point in the history
Fix old data unmarshalling
  • Loading branch information
ssd04 authored Dec 20, 2023
2 parents fb078a9 + 0d71a6c commit 989e11c
Show file tree
Hide file tree
Showing 7 changed files with 170 additions and 143 deletions.
18 changes: 17 additions & 1 deletion data/block.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"github.com/multiversx/mx-chain-core-go/core"
nodeData "github.com/multiversx/mx-chain-core-go/data"
"github.com/multiversx/mx-chain-core-go/data/alteredAccount"
"github.com/multiversx/mx-chain-core-go/data/block"
"github.com/multiversx/mx-chain-core-go/data/outport"
"github.com/multiversx/mx-chain-core-go/data/receipt"
"github.com/multiversx/mx-chain-core-go/data/rewardTx"
Expand Down Expand Up @@ -44,10 +45,25 @@ type ArgsSaveBlockData struct {
NumberOfShards uint32
}

// OutportBlockDataOld holds the block data that will be received on push events
// TODO: remove on next iterations, new versions will use outport driver structs from
// core repository, which will be backwards compatible from now on
type OutportBlockDataOld struct {
HeaderHash []byte
Body *block.Body
TransactionsPool *TransactionsPool
SignersIndexes []uint64
NotarizedHeadersHashes []string
HeaderGasConsumption outport.HeaderGasConsumption
AlteredAccounts map[string]*alteredAccount.AlteredAccount
NumberOfShards uint32
IsImportDB bool
}

// ArgsSaveBlock holds block data with header type
type ArgsSaveBlock struct {
HeaderType core.HeaderType
ArgsSaveBlockData
OutportBlockDataOld
}

// LogData holds the data needed for indexing logs and events
Expand Down
99 changes: 78 additions & 21 deletions process/preprocess/eventsPreProcessorV0.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,10 @@ import (
"github.com/multiversx/mx-chain-core-go/core"
coreData "github.com/multiversx/mx-chain-core-go/data"
nodeData "github.com/multiversx/mx-chain-core-go/data"
"github.com/multiversx/mx-chain-core-go/data/alteredAccount"
"github.com/multiversx/mx-chain-core-go/data/block"
"github.com/multiversx/mx-chain-core-go/data/outport"
"github.com/multiversx/mx-chain-notifier-go/data"
"github.com/multiversx/mx-chain-notifier-go/process"
)

// TODO: dismiss this implementation after http integration is fully deprecated
Expand All @@ -31,18 +31,8 @@ func NewEventsPreProcessorV0(args ArgsEventsPreProcessor) (*eventsPreProcessorV0

// SaveBlock will handle the block info data
func (d *eventsPreProcessorV0) SaveBlock(marshalledData []byte) error {
argsBlockS := struct {
HeaderHash []byte
Body *block.Body
TransactionsPool *outport.TransactionPool
SignersIndexes []uint64
NotarizedHeadersHashes []string
HeaderGasConsumption outport.HeaderGasConsumption
AlteredAccounts map[string]*alteredAccount.AlteredAccount
NumberOfShards uint32
IsImportDB bool
}{}
err := json.Unmarshal(marshalledData, &argsBlockS)
blockData := &data.OutportBlockDataOld{}
err := json.Unmarshal(marshalledData, blockData)
if err != nil {
return err
}
Expand All @@ -52,15 +42,20 @@ func (d *eventsPreProcessorV0) SaveBlock(marshalledData []byte) error {
return err
}

txsPool, err := d.parseTransactionsPool(blockData.TransactionsPool)
if err != nil {
return err
}

saveBlockData := &data.ArgsSaveBlockData{
HeaderHash: argsBlockS.HeaderHash,
Body: argsBlockS.Body,
SignersIndexes: argsBlockS.SignersIndexes,
NotarizedHeadersHashes: argsBlockS.NotarizedHeadersHashes,
HeaderGasConsumption: &argsBlockS.HeaderGasConsumption,
AlteredAccounts: argsBlockS.AlteredAccounts,
NumberOfShards: argsBlockS.NumberOfShards,
TransactionsPool: argsBlockS.TransactionsPool,
HeaderHash: blockData.HeaderHash,
Body: blockData.Body,
SignersIndexes: blockData.SignersIndexes,
NotarizedHeadersHashes: blockData.NotarizedHeadersHashes,
HeaderGasConsumption: &blockData.HeaderGasConsumption,
AlteredAccounts: blockData.AlteredAccounts,
NumberOfShards: blockData.NumberOfShards,
TransactionsPool: txsPool,
Header: header,
}

Expand All @@ -72,6 +67,68 @@ func (d *eventsPreProcessorV0) SaveBlock(marshalledData []byte) error {
return nil
}

func (d *eventsPreProcessorV0) parseTransactionsPool(txsPool *data.TransactionsPool) (*outport.TransactionPool, error) {
if txsPool == nil {
return nil, process.ErrNilTransactionsPool
}

return &outport.TransactionPool{
Transactions: d.parseTxs(txsPool.Txs),
SmartContractResults: d.parseScrs(txsPool.Scrs),
Logs: d.parseLogs(txsPool.Logs),
}, nil
}

func (d *eventsPreProcessorV0) parseTxs(txs map[string]*data.NodeTransaction) map[string]*outport.TxInfo {
newTxs := make(map[string]*outport.TxInfo)
for hash, txHandler := range txs {
if txHandler == nil {
continue
}

newTxs[hash] = &outport.TxInfo{
Transaction: txHandler.TransactionHandler,
FeeInfo: &txHandler.FeeInfo,
ExecutionOrder: uint32(txHandler.ExecutionOrder),
}
}

return newTxs
}

func (d *eventsPreProcessorV0) parseScrs(scrs map[string]*data.NodeSmartContractResult) map[string]*outport.SCRInfo {
newScrs := make(map[string]*outport.SCRInfo)
for hash, scrHandler := range scrs {
if scrHandler == nil {
continue
}

newScrs[hash] = &outport.SCRInfo{
SmartContractResult: scrHandler.TransactionHandler,
FeeInfo: &scrHandler.FeeInfo,
ExecutionOrder: uint32(scrHandler.ExecutionOrder),
}
}

return newScrs
}

func (d *eventsPreProcessorV0) parseLogs(logs []*data.LogData) []*outport.LogData {
newLogs := make([]*outport.LogData, len(logs))
for _, logHandler := range logs {
if logHandler == nil {
continue
}

newLogs = append(newLogs, &outport.LogData{
TxHash: logHandler.TxHash,
Log: logHandler.LogHandler,
})
}

return newLogs
}

func (d *eventsPreProcessorV0) getHeader(marshaledData []byte) (nodeData.HeaderHandler, error) {
headerTypeStruct := struct {
HeaderType core.HeaderType
Expand Down
87 changes: 19 additions & 68 deletions process/preprocess/eventsPreProcessorV0_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,11 @@ import (
"testing"

coreData "github.com/multiversx/mx-chain-core-go/data"
"github.com/multiversx/mx-chain-core-go/data/block"
"github.com/multiversx/mx-chain-core-go/data/outport"
"github.com/multiversx/mx-chain-core-go/data/smartContractResult"
"github.com/multiversx/mx-chain-core-go/data/transaction"
"github.com/multiversx/mx-chain-notifier-go/data"
notifierData "github.com/multiversx/mx-chain-notifier-go/data"
"github.com/multiversx/mx-chain-notifier-go/mocks"
"github.com/multiversx/mx-chain-notifier-go/process/preprocess"
"github.com/multiversx/mx-chain-notifier-go/testdata"
"github.com/stretchr/testify/require"
)

Expand All @@ -23,7 +20,7 @@ func TestPreProcessorV0_SaveBlock(t *testing.T) {
t.Run("nil block data", func(t *testing.T) {
t.Parallel()

outportBlock := createDefaultOutportBlockV0()
outportBlock := testdata.OutportBlockV0()
outportBlock.HeaderType = "invalid"
marshalledBlock, _ := json.Marshal(outportBlock)

Expand All @@ -46,7 +43,7 @@ func TestPreProcessorV0_SaveBlock(t *testing.T) {
},
}

outportBlock := createDefaultOutportBlockV0()
outportBlock := testdata.OutportBlockV0()

dp, err := preprocess.NewEventsPreProcessorV0(args)
require.Nil(t, err)
Expand All @@ -59,14 +56,27 @@ func TestPreProcessorV0_SaveBlock(t *testing.T) {
t.Run("should work", func(t *testing.T) {
t.Parallel()

dp, err := preprocess.NewEventsPreProcessorV0(createMockEventsDataPreProcessorArgs())
args := createMockEventsDataPreProcessorArgs()

wasCalled := false
args.Facade = &mocks.FacadeStub{
HandlePushEventsV2Called: func(events data.ArgsSaveBlockData) error {
wasCalled = true
return nil
},
}

dp, err := preprocess.NewEventsPreProcessorV0(args)
require.Nil(t, err)

outportBlock := createDefaultOutportBlockV0()
outportBlock := testdata.OutportBlockV0()

marshalledBlock, _ := json.Marshal(outportBlock)
marshalledBlock, err := json.Marshal(outportBlock)
require.Nil(t, err)
err = dp.SaveBlock(marshalledBlock)
require.Nil(t, err)

require.True(t, wasCalled)
})
}

Expand Down Expand Up @@ -102,62 +112,3 @@ func TestPreProcessorV0_FinalizedBlock(t *testing.T) {
err = dp.FinalizedBlock(marshalledBlock)
require.Nil(t, err)
}

func createDefaultOutportBlockV0() *notifierData.ArgsSaveBlock {
outportBlock := &notifierData.ArgsSaveBlock{
HeaderType: "Header",
ArgsSaveBlockData: notifierData.ArgsSaveBlockData{
HeaderHash: []byte("headerHash3"),
Body: &block.Body{
MiniBlocks: []*block.MiniBlock{
{
TxHashes: [][]byte{},
ReceiverShardID: 1,
SenderShardID: 1,
},
},
},
TransactionsPool: &outport.TransactionPool{
Transactions: map[string]*outport.TxInfo{
"txHash1": {
Transaction: &transaction.Transaction{
Nonce: 1,
GasPrice: 1,
GasLimit: 1,
},
FeeInfo: &outport.FeeInfo{
GasUsed: 1,
},
ExecutionOrder: 2,
},
},
SmartContractResults: map[string]*outport.SCRInfo{
"scrHash1": {
SmartContractResult: &smartContractResult.SmartContractResult{
Nonce: 2,
GasLimit: 2,
GasPrice: 2,
CallType: 2,
},
FeeInfo: &outport.FeeInfo{
GasUsed: 2,
},
ExecutionOrder: 0,
},
},
Logs: []*outport.LogData{
{
Log: &transaction.Log{
Address: []byte("logaddr1"),
Events: []*transaction.Event{},
},
TxHash: "logHash1",
},
},
},
NumberOfShards: 2,
},
}

return outportBlock
}
Loading

0 comments on commit 989e11c

Please sign in to comment.