Skip to content

Commit

Permalink
Merge pull request #6 from ElrondNetwork/es-accounts-history-fix
Browse files Browse the repository at this point in the history
fixed the timestamp issue on accounts-history
  • Loading branch information
bogdan-rosianu authored Feb 23, 2021
2 parents b058294 + c5af1b1 commit 085fa2c
Show file tree
Hide file tree
Showing 13 changed files with 53 additions and 28 deletions.
6 changes: 3 additions & 3 deletions data.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,9 +138,9 @@ type AccountInfo struct {

// AccountBalanceHistory represents an entry in the user accounts balances history
type AccountBalanceHistory struct {
Address string `json:"address"`
Timestamp int64 `json:"timestamp"`
Balance string `json:"balance"`
Address string `json:"address"`
Timestamp time.Duration `json:"timestamp"`
Balance string `json:"balance"`
}

// ValidatorsRatingInfo is a structure containing validators information
Expand Down
4 changes: 2 additions & 2 deletions dataIndexer.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,8 +161,8 @@ func (di *dataIndexer) UpdateTPS(tpsBenchmark statistics.TPSBenchmark) {
}

// SaveAccounts will save the provided accounts
func (di *dataIndexer) SaveAccounts(accounts []state.UserAccountHandler) {
wi := workItems.NewItemAccounts(di.elasticProcessor, accounts)
func (di *dataIndexer) SaveAccounts(timestamp uint64, accounts []state.UserAccountHandler) {
wi := workItems.NewItemAccounts(di.elasticProcessor, timestamp, accounts)
di.dispatcher.Add(wi)
}

Expand Down
17 changes: 8 additions & 9 deletions elasticProcessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -365,7 +365,7 @@ func (ei *elasticProcessor) SaveTransactions(
}
}

return ei.indexAlteredAccounts(alteredAccounts)
return ei.indexAlteredAccounts(header.GetTimeStamp(), alteredAccounts)
}

// SaveShardStatistics will prepare and save information about a shard statistics in elasticsearch server
Expand Down Expand Up @@ -492,7 +492,7 @@ func (ei *elasticProcessor) SaveRoundsInfo(infos []workItems.RoundInfo) error {
return ei.elasticClient.DoBulkRequest(&buff, roundIndex)
}

func (ei *elasticProcessor) indexAlteredAccounts(accounts map[string]struct{}) error {
func (ei *elasticProcessor) indexAlteredAccounts(blockTimestamp uint64, accounts map[string]struct{}) error {
if !ei.isIndexEnabled(accountsIndex) {
return nil
}
Expand Down Expand Up @@ -529,11 +529,11 @@ func (ei *elasticProcessor) indexAlteredAccounts(accounts map[string]struct{}) e
return nil
}

return ei.SaveAccounts(accountsToIndex)
return ei.SaveAccounts(blockTimestamp, accountsToIndex)
}

// SaveAccounts will prepare and save information about provided accounts in elasticsearch server
func (ei *elasticProcessor) SaveAccounts(accounts []state.UserAccountHandler) error {
func (ei *elasticProcessor) SaveAccounts(blockTimestamp uint64, accounts []state.UserAccountHandler) error {
if !ei.isIndexEnabled(accountsIndex) {
return nil
}
Expand Down Expand Up @@ -563,23 +563,22 @@ func (ei *elasticProcessor) SaveAccounts(accounts []state.UserAccountHandler) er
}
}

return ei.saveAccountsHistory(accountsMap)
return ei.saveAccountsHistory(blockTimestamp, accountsMap)
}

func (ei *elasticProcessor) saveAccountsHistory(accountsInfoMap map[string]*AccountInfo) error {
func (ei *elasticProcessor) saveAccountsHistory(blockTimestamp uint64, accountsInfoMap map[string]*AccountInfo) error {
if !ei.isIndexEnabled(accountsHistoryIndex) {
return nil
}

currentTimestamp := time.Now().Unix()
accountsMap := make(map[string]*AccountBalanceHistory)
for address, userAccount := range accountsInfoMap {
acc := &AccountBalanceHistory{
Address: address,
Balance: userAccount.Balance,
Timestamp: currentTimestamp,
Timestamp: time.Duration(blockTimestamp),
}
addressKey := fmt.Sprintf("%s_%d", address, currentTimestamp)
addressKey := fmt.Sprintf("%s_%d", address, blockTimestamp)
accountsMap[addressKey] = acc
}

Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ module github.com/ElrondNetwork/elastic-indexer-go
go 1.15

require (
github.com/ElrondNetwork/elrond-go v1.1.28-0.20210217095315-1dac606233cf
github.com/ElrondNetwork/elrond-go v1.1.28-0.20210223140840-1bf7cdc66a37
github.com/ElrondNetwork/elrond-go-logger v1.0.4
github.com/elastic/go-elasticsearch/v7 v7.10.0
github.com/stretchr/testify v1.7.0
Expand Down
7 changes: 7 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ github.com/ElrondNetwork/concurrent-map v0.1.3 h1:j2LtPrNJuerannC1cQDE79STvi/P04
github.com/ElrondNetwork/concurrent-map v0.1.3/go.mod h1:3XwSwn4JHI0lrKxWLZvtp53Emr8BXYTmNQGwcukHJEE=
github.com/ElrondNetwork/elastic-indexer-go v0.0.0-20210205133807-18493825ad60/go.mod h1:ta15pl+Ylin1MyuvbfJoSekIjgp+MEXcD8UnWPmtZuM=
github.com/ElrondNetwork/elastic-indexer-go v0.0.0-20210216113404-43866fbd8396/go.mod h1:WVamXFwP3zT6sQoSVeYfdwH05ayzTvreED4Dst4Qkdg=
github.com/ElrondNetwork/elastic-indexer-go v1.0.0/go.mod h1:OGSmPbjG3DncL6xXoXIbyYvF38yTgyzeTL1syJel9w0=
github.com/ElrondNetwork/elastic-indexer-go v1.0.1-0.20210223135710-293bb7ce1956/go.mod h1:5yn0oHWYlTiotK2qMpb3B7Cib/Fj3un1SW+tPoga4D4=
github.com/ElrondNetwork/elrond-go v1.1.0/go.mod h1:R5HRN9NnnJ0gRd4QogmbUbQNTEczLsFCtfIgd3+3cYY=
github.com/ElrondNetwork/elrond-go v1.1.6-0.20201113091116-de0c9a4e63c6/go.mod h1:dUNMqGg/jqTgHbnNxAmW7WkApWXlCQ8brVPbwIEFghI=
github.com/ElrondNetwork/elrond-go v1.1.6-0.20201113120119-5f3406f2d6b5/go.mod h1:JsrmzX05L/GHE/nOeQuJCKHasJnZN2WMFr+Y6jjvHGg=
Expand All @@ -38,6 +40,10 @@ github.com/ElrondNetwork/elrond-go v1.1.28-0.20210211124017-b6038647eb13 h1:QFfw
github.com/ElrondNetwork/elrond-go v1.1.28-0.20210211124017-b6038647eb13/go.mod h1:wriFQjR/rABM+HkhPB8BA7qRVyOmPcVXJA5/ThqJ0hc=
github.com/ElrondNetwork/elrond-go v1.1.28-0.20210217095315-1dac606233cf h1:lDzF/+Tkrc4xO/6TgUMY5li0uYmFgttBsGsMxN81FAw=
github.com/ElrondNetwork/elrond-go v1.1.28-0.20210217095315-1dac606233cf/go.mod h1:gkJB13DqJ5pbzhPUM0iocWVB6LRUEnZZYptC3z3jLzs=
github.com/ElrondNetwork/elrond-go v1.1.28-0.20210223113334-ca37e112bec9 h1:owfjul9Wu0aHgeJk+g1JkAKB2xZwNU6mNuarCqmQ9ME=
github.com/ElrondNetwork/elrond-go v1.1.28-0.20210223113334-ca37e112bec9/go.mod h1:OI98prLTyuTK2Yglm3lX2DcobtiY61JRKLVu/GxeHCo=
github.com/ElrondNetwork/elrond-go v1.1.28-0.20210223140840-1bf7cdc66a37 h1:fY1eqts0HbfUlypV4PDf7lczyRiY2SGdgy+qEu0nzdc=
github.com/ElrondNetwork/elrond-go v1.1.28-0.20210223140840-1bf7cdc66a37/go.mod h1:n12dVuNcz8xMm6X9jI81GZqW9Z1dYWTIK0Osfo+YMVE=
github.com/ElrondNetwork/elrond-go-logger v1.0.2/go.mod h1:e5D+c97lKUfFdAzFX7rrI2Igl/z4Y0RkKYKWyzprTGk=
github.com/ElrondNetwork/elrond-go-logger v1.0.4 h1:i5Yu4qyjTZDwvBY/ykbNpp2SP9jxwk/QTivRwSZSTAQ=
github.com/ElrondNetwork/elrond-go-logger v1.0.4/go.mod h1:e5D+c97lKUfFdAzFX7rrI2Igl/z4Y0RkKYKWyzprTGk=
Expand Down Expand Up @@ -347,6 +353,7 @@ github.com/libp2p/go-libp2p-core v0.6.0/go.mod h1:txwbVEhHEXikXn9gfC7/UDDw7rkxuX
github.com/libp2p/go-libp2p-core v0.6.1/go.mod h1:FfewUH/YpvWbEB+ZY9AQRQ4TAD8sJBt/G1rVvhz5XT8=
github.com/libp2p/go-libp2p-core v0.7.0/go.mod h1:FfewUH/YpvWbEB+ZY9AQRQ4TAD8sJBt/G1rVvhz5XT8=
github.com/libp2p/go-libp2p-core v0.8.0/go.mod h1:FfewUH/YpvWbEB+ZY9AQRQ4TAD8sJBt/G1rVvhz5XT8=
github.com/libp2p/go-libp2p-core v0.8.5/go.mod h1:FfewUH/YpvWbEB+ZY9AQRQ4TAD8sJBt/G1rVvhz5XT8=
github.com/libp2p/go-libp2p-crypto v0.1.0/go.mod h1:sPUokVISZiy+nNuTTH/TY+leRSxnFj/2GLjtOTW90hI=
github.com/libp2p/go-libp2p-discovery v0.2.0/go.mod h1:s4VGaxYMbw4+4+tsoQTqh7wfxg97AEdo4GYBt6BadWg=
github.com/libp2p/go-libp2p-discovery v0.3.0/go.mod h1:o03drFnz9BVAZdzC/QUQ+NeQOu38Fu7LJGEOK2gQltw=
Expand Down
2 changes: 1 addition & 1 deletion interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ type ElasticProcessor interface {
SaveRoundsInfo(infos []workItems.RoundInfo) error
SaveShardValidatorsPubKeys(shardID, epoch uint32, shardValidatorsPubKeys [][]byte) error
SetTxLogsProcessor(txLogsProc process.TransactionLogProcessorDatabase)
SaveAccounts(accounts []state.UserAccountHandler) error
SaveAccounts(blockTimestamp uint64, accounts []state.UserAccountHandler) error
IsInterfaceNil() bool
}

Expand Down
6 changes: 3 additions & 3 deletions mock/elasticProcessorStub.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ type ElasticProcessorStub struct {
SaveRoundsInfoCalled func(infos []workItems.RoundInfo) error
SaveShardValidatorsPubKeysCalled func(shardID, epoch uint32, shardValidatorsPubKeys [][]byte) error
SetTxLogsProcessorCalled func(txLogsProc process.TransactionLogProcessorDatabase)
SaveAccountsCalled func(acc []state.UserAccountHandler) error
SaveAccountsCalled func(timestamp uint64, acc []state.UserAccountHandler) error
}

// SaveShardStatistics -
Expand Down Expand Up @@ -104,9 +104,9 @@ func (eim *ElasticProcessorStub) SetTxLogsProcessor(txLogsProc process.Transacti
}

// SaveAccounts -
func (eim *ElasticProcessorStub) SaveAccounts(acc []state.UserAccountHandler) error {
func (eim *ElasticProcessorStub) SaveAccounts(timestamp uint64, acc []state.UserAccountHandler) error {
if eim.SaveAccountsCalled != nil {
return eim.SaveAccountsCalled(acc)
return eim.SaveAccountsCalled(timestamp, acc)
}

return nil
Expand Down
2 changes: 1 addition & 1 deletion nilIndexer.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ func (ni *NilIndexer) SaveValidatorsPubKeys(_ map[uint32][][]byte, _ uint32) {
}

// SaveAccounts won't do anything as this is a nil implementation
func (ni *NilIndexer) SaveAccounts(_ []state.UserAccountHandler) {
func (ni *NilIndexer) SaveAccounts(_ uint64, _ []state.UserAccountHandler) {
}

// Close will do nothing
Expand Down
7 changes: 7 additions & 0 deletions templates/noKibana/accountsHistory.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,4 +9,11 @@ var AccountsHistory = Object{
"number_of_shards": 5,
"number_of_replicas": 0,
},
"mappings": Object{
"properties": Object{
"timestamp": Object{
"type": "date",
},
},
},
}
7 changes: 7 additions & 0 deletions templates/withKibana/accountshistory.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,4 +11,11 @@ var AccountsHistory = Object{
"opendistro.index_state_management.policy_id": "accountshistory_policy",
"opendistro.index_state_management.rollover_alias": "accountshistory",
},
"mappings": Object{
"properties": Object{
"timestamp": Object{
"type": "date",
},
},
},
}
2 changes: 1 addition & 1 deletion workItems/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,5 +41,5 @@ type saveValidatorsIndexer interface {
}

type saveAccountsIndexer interface {
SaveAccounts(accounts []state.UserAccountHandler) error
SaveAccounts(blockTimestamp uint64, accounts []state.UserAccountHandler) error
}
13 changes: 8 additions & 5 deletions workItems/workItemAccounts.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,24 +3,27 @@ package workItems
import "github.com/ElrondNetwork/elrond-go/data/state"

type itemAccounts struct {
indexer saveAccountsIndexer
accounts []state.UserAccountHandler
indexer saveAccountsIndexer
blockTimestamp uint64
accounts []state.UserAccountHandler
}

// NewItemAccounts will create a new instance of itemAccounts
func NewItemAccounts(
indexer saveAccountsIndexer,
blockTimestamp uint64,
accounts []state.UserAccountHandler,
) WorkItemHandler {
return &itemAccounts{
indexer: indexer,
accounts: accounts,
indexer: indexer,
accounts: accounts,
blockTimestamp: blockTimestamp,
}
}

// Save will save information about an account
func (wiv *itemAccounts) Save() error {
err := wiv.indexer.SaveAccounts(wiv.accounts)
err := wiv.indexer.SaveAccounts(wiv.blockTimestamp, wiv.accounts)
if err != nil {
log.Warn("itemAccounts.Save",
"could not index account",
Expand Down
6 changes: 4 additions & 2 deletions workItems/workItemAccounts_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,12 @@ func TestItemAccounts_Save(t *testing.T) {
called := false
itemAccounts := workItems.NewItemAccounts(
&mock.ElasticProcessorStub{
SaveAccountsCalled: func(_ []state.UserAccountHandler) error {
SaveAccountsCalled: func(_ uint64, _ []state.UserAccountHandler) error {
called = true
return nil
},
},
0,
[]state.UserAccountHandler{},
)
require.False(t, itemAccounts.IsInterfaceNil())
Expand All @@ -32,10 +33,11 @@ func TestItemAccounts_SaveAccountsShouldErr(t *testing.T) {
localErr := errors.New("local err")
itemAccounts := workItems.NewItemAccounts(
&mock.ElasticProcessorStub{
SaveAccountsCalled: func(_ []state.UserAccountHandler) error {
SaveAccountsCalled: func(_ uint64, _ []state.UserAccountHandler) error {
return localErr
},
},
0,
[]state.UserAccountHandler{},
)
require.False(t, itemAccounts.IsInterfaceNil())
Expand Down

0 comments on commit 085fa2c

Please sign in to comment.