diff --git a/data.go b/data.go index afb9879e..069602d3 100644 --- a/data.go +++ b/data.go @@ -138,9 +138,9 @@ type AccountInfo struct { // AccountBalanceHistory represents an entry in the user accounts balances history type AccountBalanceHistory struct { - Address string `json:"address"` - Timestamp int64 `json:"timestamp"` - Balance string `json:"balance"` + Address string `json:"address"` + Timestamp time.Duration `json:"timestamp"` + Balance string `json:"balance"` } // ValidatorsRatingInfo is a structure containing validators information diff --git a/dataIndexer.go b/dataIndexer.go index 4deccb2b..647b8972 100644 --- a/dataIndexer.go +++ b/dataIndexer.go @@ -161,8 +161,8 @@ func (di *dataIndexer) UpdateTPS(tpsBenchmark statistics.TPSBenchmark) { } // SaveAccounts will save the provided accounts -func (di *dataIndexer) SaveAccounts(accounts []state.UserAccountHandler) { - wi := workItems.NewItemAccounts(di.elasticProcessor, accounts) +func (di *dataIndexer) SaveAccounts(timestamp uint64, accounts []state.UserAccountHandler) { + wi := workItems.NewItemAccounts(di.elasticProcessor, timestamp, accounts) di.dispatcher.Add(wi) } diff --git a/elasticProcessor.go b/elasticProcessor.go index 5d95bfa8..c7b4ee98 100644 --- a/elasticProcessor.go +++ b/elasticProcessor.go @@ -365,7 +365,7 @@ func (ei *elasticProcessor) SaveTransactions( } } - return ei.indexAlteredAccounts(alteredAccounts) + return ei.indexAlteredAccounts(header.GetTimeStamp(), alteredAccounts) } // SaveShardStatistics will prepare and save information about a shard statistics in elasticsearch server @@ -492,7 +492,7 @@ func (ei *elasticProcessor) SaveRoundsInfo(infos []workItems.RoundInfo) error { return ei.elasticClient.DoBulkRequest(&buff, roundIndex) } -func (ei *elasticProcessor) indexAlteredAccounts(accounts map[string]struct{}) error { +func (ei *elasticProcessor) indexAlteredAccounts(blockTimestamp uint64, accounts map[string]struct{}) error { if !ei.isIndexEnabled(accountsIndex) { return nil } @@ -529,11 +529,11 @@ func (ei *elasticProcessor) indexAlteredAccounts(accounts map[string]struct{}) e return nil } - return ei.SaveAccounts(accountsToIndex) + return ei.SaveAccounts(blockTimestamp, accountsToIndex) } // SaveAccounts will prepare and save information about provided accounts in elasticsearch server -func (ei *elasticProcessor) SaveAccounts(accounts []state.UserAccountHandler) error { +func (ei *elasticProcessor) SaveAccounts(blockTimestamp uint64, accounts []state.UserAccountHandler) error { if !ei.isIndexEnabled(accountsIndex) { return nil } @@ -563,23 +563,22 @@ func (ei *elasticProcessor) SaveAccounts(accounts []state.UserAccountHandler) er } } - return ei.saveAccountsHistory(accountsMap) + return ei.saveAccountsHistory(blockTimestamp, accountsMap) } -func (ei *elasticProcessor) saveAccountsHistory(accountsInfoMap map[string]*AccountInfo) error { +func (ei *elasticProcessor) saveAccountsHistory(blockTimestamp uint64, accountsInfoMap map[string]*AccountInfo) error { if !ei.isIndexEnabled(accountsHistoryIndex) { return nil } - currentTimestamp := time.Now().Unix() accountsMap := make(map[string]*AccountBalanceHistory) for address, userAccount := range accountsInfoMap { acc := &AccountBalanceHistory{ Address: address, Balance: userAccount.Balance, - Timestamp: currentTimestamp, + Timestamp: time.Duration(blockTimestamp), } - addressKey := fmt.Sprintf("%s_%d", address, currentTimestamp) + addressKey := fmt.Sprintf("%s_%d", address, blockTimestamp) accountsMap[addressKey] = acc } diff --git a/go.mod b/go.mod index dc946caf..fc4b798b 100644 --- a/go.mod +++ b/go.mod @@ -3,7 +3,7 @@ module github.com/ElrondNetwork/elastic-indexer-go go 1.15 require ( - github.com/ElrondNetwork/elrond-go v1.1.28-0.20210217095315-1dac606233cf + github.com/ElrondNetwork/elrond-go v1.1.28-0.20210223140840-1bf7cdc66a37 github.com/ElrondNetwork/elrond-go-logger v1.0.4 github.com/elastic/go-elasticsearch/v7 v7.10.0 github.com/stretchr/testify v1.7.0 diff --git a/go.sum b/go.sum index 99a484a0..a5f8b5f9 100644 --- a/go.sum +++ b/go.sum @@ -27,6 +27,8 @@ github.com/ElrondNetwork/concurrent-map v0.1.3 h1:j2LtPrNJuerannC1cQDE79STvi/P04 github.com/ElrondNetwork/concurrent-map v0.1.3/go.mod h1:3XwSwn4JHI0lrKxWLZvtp53Emr8BXYTmNQGwcukHJEE= github.com/ElrondNetwork/elastic-indexer-go v0.0.0-20210205133807-18493825ad60/go.mod h1:ta15pl+Ylin1MyuvbfJoSekIjgp+MEXcD8UnWPmtZuM= github.com/ElrondNetwork/elastic-indexer-go v0.0.0-20210216113404-43866fbd8396/go.mod h1:WVamXFwP3zT6sQoSVeYfdwH05ayzTvreED4Dst4Qkdg= +github.com/ElrondNetwork/elastic-indexer-go v1.0.0/go.mod h1:OGSmPbjG3DncL6xXoXIbyYvF38yTgyzeTL1syJel9w0= +github.com/ElrondNetwork/elastic-indexer-go v1.0.1-0.20210223135710-293bb7ce1956/go.mod h1:5yn0oHWYlTiotK2qMpb3B7Cib/Fj3un1SW+tPoga4D4= github.com/ElrondNetwork/elrond-go v1.1.0/go.mod h1:R5HRN9NnnJ0gRd4QogmbUbQNTEczLsFCtfIgd3+3cYY= github.com/ElrondNetwork/elrond-go v1.1.6-0.20201113091116-de0c9a4e63c6/go.mod h1:dUNMqGg/jqTgHbnNxAmW7WkApWXlCQ8brVPbwIEFghI= github.com/ElrondNetwork/elrond-go v1.1.6-0.20201113120119-5f3406f2d6b5/go.mod h1:JsrmzX05L/GHE/nOeQuJCKHasJnZN2WMFr+Y6jjvHGg= @@ -38,6 +40,10 @@ github.com/ElrondNetwork/elrond-go v1.1.28-0.20210211124017-b6038647eb13 h1:QFfw github.com/ElrondNetwork/elrond-go v1.1.28-0.20210211124017-b6038647eb13/go.mod h1:wriFQjR/rABM+HkhPB8BA7qRVyOmPcVXJA5/ThqJ0hc= github.com/ElrondNetwork/elrond-go v1.1.28-0.20210217095315-1dac606233cf h1:lDzF/+Tkrc4xO/6TgUMY5li0uYmFgttBsGsMxN81FAw= github.com/ElrondNetwork/elrond-go v1.1.28-0.20210217095315-1dac606233cf/go.mod h1:gkJB13DqJ5pbzhPUM0iocWVB6LRUEnZZYptC3z3jLzs= +github.com/ElrondNetwork/elrond-go v1.1.28-0.20210223113334-ca37e112bec9 h1:owfjul9Wu0aHgeJk+g1JkAKB2xZwNU6mNuarCqmQ9ME= +github.com/ElrondNetwork/elrond-go v1.1.28-0.20210223113334-ca37e112bec9/go.mod h1:OI98prLTyuTK2Yglm3lX2DcobtiY61JRKLVu/GxeHCo= +github.com/ElrondNetwork/elrond-go v1.1.28-0.20210223140840-1bf7cdc66a37 h1:fY1eqts0HbfUlypV4PDf7lczyRiY2SGdgy+qEu0nzdc= +github.com/ElrondNetwork/elrond-go v1.1.28-0.20210223140840-1bf7cdc66a37/go.mod h1:n12dVuNcz8xMm6X9jI81GZqW9Z1dYWTIK0Osfo+YMVE= github.com/ElrondNetwork/elrond-go-logger v1.0.2/go.mod h1:e5D+c97lKUfFdAzFX7rrI2Igl/z4Y0RkKYKWyzprTGk= github.com/ElrondNetwork/elrond-go-logger v1.0.4 h1:i5Yu4qyjTZDwvBY/ykbNpp2SP9jxwk/QTivRwSZSTAQ= github.com/ElrondNetwork/elrond-go-logger v1.0.4/go.mod h1:e5D+c97lKUfFdAzFX7rrI2Igl/z4Y0RkKYKWyzprTGk= @@ -347,6 +353,7 @@ github.com/libp2p/go-libp2p-core v0.6.0/go.mod h1:txwbVEhHEXikXn9gfC7/UDDw7rkxuX github.com/libp2p/go-libp2p-core v0.6.1/go.mod h1:FfewUH/YpvWbEB+ZY9AQRQ4TAD8sJBt/G1rVvhz5XT8= github.com/libp2p/go-libp2p-core v0.7.0/go.mod h1:FfewUH/YpvWbEB+ZY9AQRQ4TAD8sJBt/G1rVvhz5XT8= github.com/libp2p/go-libp2p-core v0.8.0/go.mod h1:FfewUH/YpvWbEB+ZY9AQRQ4TAD8sJBt/G1rVvhz5XT8= +github.com/libp2p/go-libp2p-core v0.8.5/go.mod h1:FfewUH/YpvWbEB+ZY9AQRQ4TAD8sJBt/G1rVvhz5XT8= github.com/libp2p/go-libp2p-crypto v0.1.0/go.mod h1:sPUokVISZiy+nNuTTH/TY+leRSxnFj/2GLjtOTW90hI= github.com/libp2p/go-libp2p-discovery v0.2.0/go.mod h1:s4VGaxYMbw4+4+tsoQTqh7wfxg97AEdo4GYBt6BadWg= github.com/libp2p/go-libp2p-discovery v0.3.0/go.mod h1:o03drFnz9BVAZdzC/QUQ+NeQOu38Fu7LJGEOK2gQltw= diff --git a/interface.go b/interface.go index eabb3d5b..4db18869 100644 --- a/interface.go +++ b/interface.go @@ -33,7 +33,7 @@ type ElasticProcessor interface { SaveRoundsInfo(infos []workItems.RoundInfo) error SaveShardValidatorsPubKeys(shardID, epoch uint32, shardValidatorsPubKeys [][]byte) error SetTxLogsProcessor(txLogsProc process.TransactionLogProcessorDatabase) - SaveAccounts(accounts []state.UserAccountHandler) error + SaveAccounts(blockTimestamp uint64, accounts []state.UserAccountHandler) error IsInterfaceNil() bool } diff --git a/mock/elasticProcessorStub.go b/mock/elasticProcessorStub.go index abcda5e2..0bb53f0d 100644 --- a/mock/elasticProcessorStub.go +++ b/mock/elasticProcessorStub.go @@ -21,7 +21,7 @@ type ElasticProcessorStub struct { SaveRoundsInfoCalled func(infos []workItems.RoundInfo) error SaveShardValidatorsPubKeysCalled func(shardID, epoch uint32, shardValidatorsPubKeys [][]byte) error SetTxLogsProcessorCalled func(txLogsProc process.TransactionLogProcessorDatabase) - SaveAccountsCalled func(acc []state.UserAccountHandler) error + SaveAccountsCalled func(timestamp uint64, acc []state.UserAccountHandler) error } // SaveShardStatistics - @@ -104,9 +104,9 @@ func (eim *ElasticProcessorStub) SetTxLogsProcessor(txLogsProc process.Transacti } // SaveAccounts - -func (eim *ElasticProcessorStub) SaveAccounts(acc []state.UserAccountHandler) error { +func (eim *ElasticProcessorStub) SaveAccounts(timestamp uint64, acc []state.UserAccountHandler) error { if eim.SaveAccountsCalled != nil { - return eim.SaveAccountsCalled(acc) + return eim.SaveAccountsCalled(timestamp, acc) } return nil diff --git a/nilIndexer.go b/nilIndexer.go index 73b9973c..0ea6cc6a 100644 --- a/nilIndexer.go +++ b/nilIndexer.go @@ -46,7 +46,7 @@ func (ni *NilIndexer) SaveValidatorsPubKeys(_ map[uint32][][]byte, _ uint32) { } // SaveAccounts won't do anything as this is a nil implementation -func (ni *NilIndexer) SaveAccounts(_ []state.UserAccountHandler) { +func (ni *NilIndexer) SaveAccounts(_ uint64, _ []state.UserAccountHandler) { } // Close will do nothing diff --git a/templates/noKibana/accountsHistory.go b/templates/noKibana/accountsHistory.go index d271de5f..482e4ba5 100644 --- a/templates/noKibana/accountsHistory.go +++ b/templates/noKibana/accountsHistory.go @@ -9,4 +9,11 @@ var AccountsHistory = Object{ "number_of_shards": 5, "number_of_replicas": 0, }, + "mappings": Object{ + "properties": Object{ + "timestamp": Object{ + "type": "date", + }, + }, + }, } diff --git a/templates/withKibana/accountshistory.go b/templates/withKibana/accountshistory.go index 34c8d133..419fb25a 100644 --- a/templates/withKibana/accountshistory.go +++ b/templates/withKibana/accountshistory.go @@ -11,4 +11,11 @@ var AccountsHistory = Object{ "opendistro.index_state_management.policy_id": "accountshistory_policy", "opendistro.index_state_management.rollover_alias": "accountshistory", }, + "mappings": Object{ + "properties": Object{ + "timestamp": Object{ + "type": "date", + }, + }, + }, } diff --git a/workItems/interface.go b/workItems/interface.go index b3096544..9c78bc51 100644 --- a/workItems/interface.go +++ b/workItems/interface.go @@ -41,5 +41,5 @@ type saveValidatorsIndexer interface { } type saveAccountsIndexer interface { - SaveAccounts(accounts []state.UserAccountHandler) error + SaveAccounts(blockTimestamp uint64, accounts []state.UserAccountHandler) error } diff --git a/workItems/workItemAccounts.go b/workItems/workItemAccounts.go index c66dc4aa..012c83c7 100644 --- a/workItems/workItemAccounts.go +++ b/workItems/workItemAccounts.go @@ -3,24 +3,27 @@ package workItems import "github.com/ElrondNetwork/elrond-go/data/state" type itemAccounts struct { - indexer saveAccountsIndexer - accounts []state.UserAccountHandler + indexer saveAccountsIndexer + blockTimestamp uint64 + accounts []state.UserAccountHandler } // NewItemAccounts will create a new instance of itemAccounts func NewItemAccounts( indexer saveAccountsIndexer, + blockTimestamp uint64, accounts []state.UserAccountHandler, ) WorkItemHandler { return &itemAccounts{ - indexer: indexer, - accounts: accounts, + indexer: indexer, + accounts: accounts, + blockTimestamp: blockTimestamp, } } // Save will save information about an account func (wiv *itemAccounts) Save() error { - err := wiv.indexer.SaveAccounts(wiv.accounts) + err := wiv.indexer.SaveAccounts(wiv.blockTimestamp, wiv.accounts) if err != nil { log.Warn("itemAccounts.Save", "could not index account", diff --git a/workItems/workItemAccounts_test.go b/workItems/workItemAccounts_test.go index 852db796..72c9ca78 100644 --- a/workItems/workItemAccounts_test.go +++ b/workItems/workItemAccounts_test.go @@ -14,11 +14,12 @@ func TestItemAccounts_Save(t *testing.T) { called := false itemAccounts := workItems.NewItemAccounts( &mock.ElasticProcessorStub{ - SaveAccountsCalled: func(_ []state.UserAccountHandler) error { + SaveAccountsCalled: func(_ uint64, _ []state.UserAccountHandler) error { called = true return nil }, }, + 0, []state.UserAccountHandler{}, ) require.False(t, itemAccounts.IsInterfaceNil()) @@ -32,10 +33,11 @@ func TestItemAccounts_SaveAccountsShouldErr(t *testing.T) { localErr := errors.New("local err") itemAccounts := workItems.NewItemAccounts( &mock.ElasticProcessorStub{ - SaveAccountsCalled: func(_ []state.UserAccountHandler) error { + SaveAccountsCalled: func(_ uint64, _ []state.UserAccountHandler) error { return localErr }, }, + 0, []state.UserAccountHandler{}, ) require.False(t, itemAccounts.IsInterfaceNil())