Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactoring: abstract storage layer and reindexer entity #430

Merged
merged 17 commits into from
Jan 8, 2021
Prev Previous commit
Next Next commit
Rebased
aopoltorzhicky committed Jan 8, 2021
commit f976347d328e2a75c6624f1f9eaaad468903fd63
409 changes: 24 additions & 385 deletions cmd/api/docs/docs.go

Large diffs are not rendered by default.

404 changes: 24 additions & 380 deletions cmd/api/docs/swagger.json

Large diffs are not rendered by default.

238 changes: 16 additions & 222 deletions cmd/api/docs/swagger.yaml

Large diffs are not rendered by default.

6 changes: 3 additions & 3 deletions cmd/api/handlers/dapp.go
Original file line number Diff line number Diff line change
@@ -116,9 +116,9 @@ func (ctx *Context) appendDAppInfo(dapp *tzip.DApp, withDetails bool) (DApp, err
entrypoints = append(entrypoints, c.DexVolumeEntrypoints...)
}

vol, err := ctx.ES.GetToken24HoursVolume(consts.Mainnet, token.Contract, initiators, entrypoints, token.TokenID)
vol, err := ctx.Transfers.GetToken24HoursVolume(consts.Mainnet, token.Contract, initiators, entrypoints, token.TokenID)
if err != nil {
if elastic.IsRecordNotFound(err) {
if ctx.Storage.IsRecordNotFound(err) {
continue
}
return result, err
@@ -153,7 +153,7 @@ func (ctx *Context) appendDAppInfo(dapp *tzip.DApp, withDetails bool) (DApp, err
}
result.Tokens = append(result.Tokens, tokens...)

vol, err := ctx.ES.GetContract24HoursVolume(consts.Mainnet, address.Address, address.DexVolumeEntrypoints)
vol, err := ctx.Operations.GetContract24HoursVolume(consts.Mainnet, address.Address, address.DexVolumeEntrypoints)
if err != nil {
return result, err
}
2 changes: 1 addition & 1 deletion cmd/api/handlers/stats.go
Original file line number Diff line number Diff line change
@@ -245,7 +245,7 @@ func (ctx *Context) getHistogramOptions(name, network string, addresses ...strin
// @Param period query string true "One of periods" Enums(all, year, month, week, day)
// @Accept json
// @Produce json
// @Success 200 {object} contract.DAppStats
// @Success 200 {object} operation.DAppStats
// @Failure 400 {object} Error
// @Failure 500 {object} Error
// @Router /stats/{network}/contracts [get]
4 changes: 2 additions & 2 deletions cmd/api/handlers/swagger.go
Original file line number Diff line number Diff line change
@@ -8,13 +8,13 @@ import (
// GetSwaggerDoc -
func (ctx *Context) GetSwaggerDoc(c *gin.Context) {
doc, err := swag.ReadDoc()
if handleError(c, err, 0) {
if ctx.handleError(c, err, 0) {
return
}

c.Header("Content-Type", "application/json")
_, err = c.Writer.Write([]byte(doc))
if handleError(c, err, 0) {
if ctx.handleError(c, err, 0) {
return
}
}
4 changes: 3 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
@@ -6,6 +6,7 @@ require (
github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751
github.com/aws/aws-sdk-go v1.30.10
github.com/btcsuite/btcutil v1.0.1
github.com/cpuguy83/go-md2man/v2 v2.0.0 // indirect
github.com/dgrijalva/jwt-go v3.2.0+incompatible
github.com/elastic/go-elasticsearch/v8 v8.0.0-20191218082911-5398a82b748f
github.com/fatih/color v1.9.0
@@ -30,6 +31,7 @@ require (
github.com/nats-io/nats.go v1.10.0
github.com/pkg/errors v0.9.1
github.com/restream/reindexer v3.0.0+incompatible
github.com/russross/blackfriday/v2 v2.1.0 // indirect
github.com/schollz/progressbar/v3 v3.1.1
github.com/sergi/go-diff v1.1.0
github.com/sirupsen/logrus v1.4.2
@@ -46,7 +48,7 @@ require (
golang.org/x/net v0.0.0-20201224014010-6772e930b67b // indirect
golang.org/x/oauth2 v0.0.0-20200107190931-bf48bf16ab8d
golang.org/x/time v0.0.0-20200630173020-3af7569d3a1e // indirect
golang.org/x/tools v0.0.0-20201226215659-b1c90890d22a // indirect
golang.org/x/tools v0.0.0-20210107193943-4ed967dd8eff // indirect
google.golang.org/appengine v1.6.6 // indirect
google.golang.org/protobuf v1.25.0 // indirect
gopkg.in/go-playground/validator.v9 v9.31.0
59 changes: 6 additions & 53 deletions go.sum

Large diffs are not rendered by default.

31 changes: 16 additions & 15 deletions internal/elastic/operation/storage.go
Original file line number Diff line number Diff line change
@@ -5,6 +5,7 @@ import (
"fmt"
"time"

constants "github.com/baking-bad/bcdhub/internal/contractparser/consts"
"github.com/baking-bad/bcdhub/internal/elastic/consts"
"github.com/baking-bad/bcdhub/internal/elastic/core"
"github.com/baking-bad/bcdhub/internal/helpers"
@@ -237,33 +238,33 @@ func (storage *Storage) GetStats(network, address string) (stats operation.Stats

// GetContract24HoursVolume -
func (storage *Storage) GetContract24HoursVolume(network, address string, entrypoints []string) (float64, error) {
query := newQuery().Query(
boolQ(
filter(
boolQ(
should(
matchPhrase("destination", address),
matchPhrase("source", address),
query := core.NewQuery().Query(
core.Bool(
core.Filter(
core.Bool(
core.Should(
core.MatchPhrase("destination", address),
core.MatchPhrase("source", address),
),
minimumShouldMatch(1),
core.MinimumShouldMatch(1),
),
term("network", network),
term("status", consts.Applied),
rangeQ("timestamp", qItem{
core.Term("network", network),
core.Term("status", constants.Applied),
core.Range("timestamp", core.Item{
"lte": "now",
"gt": "now-24h",
}),
in("entrypoint.keyword", entrypoints),
core.In("entrypoint.keyword", entrypoints),
),
),
).Add(
aggs(
aggItem{"volume", sum("amount")},
core.Aggs(
core.AggItem{Name: "volume", Body: core.Sum("amount")},
),
).Zero()

var response aggVolumeSumResponse
if err := e.query([]string{consts.DocOperations}, query, &response); err != nil {
if err := storage.es.Query([]string{models.DocOperations}, query, &response); err != nil {
return 0, err
}

136 changes: 14 additions & 122 deletions internal/elastic/transfer/storage.go
Original file line number Diff line number Diff line change
@@ -4,6 +4,7 @@ import (
"encoding/json"
"fmt"

"github.com/baking-bad/bcdhub/internal/contractparser/consts"
"github.com/baking-bad/bcdhub/internal/elastic/core"
"github.com/baking-bad/bcdhub/internal/models"
"github.com/baking-bad/bcdhub/internal/models/transfer"
@@ -113,140 +114,31 @@ func (storage *Storage) GetTokenSupply(network, address string, tokenID int64) (
return
}

// GetBalances -
func (storage *Storage) GetBalances(network, contract string, level int64, addresses ...transfer.TokenBalance) (map[transfer.TokenBalance]int64, error) {
filters := []core.Item{
core.Match("network", network),
}

if contract != "" {
filters = append(filters, core.MatchPhrase("contract", contract))
}

if level > 0 {
filters = append(filters, core.Range("level", core.Item{
"lt": level,
}))
}

b := core.Bool(
core.Filter(filters...),
)

if len(addresses) > 0 {
addressFilters := make([]core.Item, 0)

for _, a := range addresses {
addressFilters = append(addressFilters, core.Bool(
core.Filter(
core.MatchPhrase("from", a.Address),
core.Term("token_id", a.TokenID),
),
))
}

b.Get("bool").Extend(
core.Should(addressFilters...),
)
b.Get("bool").Extend(core.MinimumShouldMatch(1))
}

query := core.NewQuery().Query(b).Add(
core.Item{
"aggs": core.Item{
"balances": core.Item{
"scripted_metric": core.Item{
"init_script": "state.balances = [:]",
"map_script": `
if (!state.balances.containsKey(doc['from.keyword'].value)) {
state.balances[doc['from.keyword'].value + '_' + doc['token_id'].value] = doc['amount'].value;
} else {
state.balances[doc['from.keyword'].value + '_' + doc['token_id'].value] = state.balances[doc['from.keyword'].value + '_' + doc['token_id'].value] - doc['amount'].value;
}

if (!state.balances.containsKey(doc['to.keyword'].value)) {
state.balances[doc['to.keyword'].value + '_' + doc['token_id'].value] = doc['amount'].value;
} else {
state.balances[doc['to.keyword'].value + '_' + doc['token_id'].value] = state.balances[doc['to.keyword'].value + '_' + doc['token_id'].value] + doc['amount'].value;
}
`,
"combine_script": `
Map balances = [:];
for (entry in state.balances.entrySet()) {
if (!balances.containsKey(entry.getKey())) {
balances[entry.getKey()] = entry.getValue();
} else {
balances[entry.getKey()] = balances[entry.getKey()] + entry.getValue();
}
}
return balances;
`,
"reduce_script": `
Map balances = [:];
for (state in states) {
for (entry in state.entrySet()) {
if (!balances.containsKey(entry.getKey())) {
balances[entry.getKey()] = entry.getValue();
} else {
balances[entry.getKey()] = balances[entry.getKey()] + entry.getValue();
}
}
}
return balances;
`,
},
},
},
},
).Zero()
var response getAccountBalancesResponse
if err := storage.es.Query([]string{models.DocTransfers}, query, &response); err != nil {
return nil, err
}

balances := make(map[transfer.TokenBalance]int64)
for key, balance := range response.Agg.Balances.Value {
parts := strings.Split(key, "_")
if len(parts) != 2 {
return nil, errors.Errorf("Invalid addressToken key split size: %d", len(parts))
}
tokenID, err := strconv.ParseInt(parts[1], 10, 64)
if err != nil {
return nil, err
}
balances[transfer.TokenBalance{
Address: parts[0],
TokenID: tokenID,
}] = int64(balance)
}
return balances, nil
}

// GetToken24HoursVolume - returns token volume for last 24 hours
func (storage *Storage) GetToken24HoursVolume(network, contract string, initiators, entrypoints []string, tokenID int64) (float64, error) {
query := newQuery().Query(
boolQ(
filter(
term("contract.keyword", contract),
term("network", network),
term("status", consts.Applied),
term("token_id", tokenID),
rangeQ("timestamp", qItem{
query := core.NewQuery().Query(
core.Bool(
core.Filter(
core.Term("contract.keyword", contract),
core.Term("network", network),
core.Term("status", consts.Applied),
core.Term("token_id", tokenID),
core.Range("timestamp", core.Item{
"lte": "now",
"gt": "now-24h",
}),
in("parent.keyword", entrypoints),
in("initiator.keyword", initiators),
core.In("parent.keyword", entrypoints),
core.In("initiator.keyword", initiators),
),
),
).Add(
aggs(
aggItem{"volume", sum("amount")},
core.Aggs(
core.AggItem{Name: "volume", Body: core.Sum("amount")},
),
).Zero()

var response aggVolumeSumResponse
if err := e.query([]string{consts.DocTransfers}, query, &response); err != nil {
if err := storage.es.Query([]string{models.DocTransfers}, query, &response); err != nil {
return 0, err
}

5 changes: 5 additions & 0 deletions internal/reindexer/operation/storage.go
Original file line number Diff line number Diff line change
@@ -350,3 +350,8 @@ func periodToRange(period string, query *reindexer.Query) error {
}
return nil
}

// GetContract24HoursVolume -
func (storage *Storage) GetContract24HoursVolume(network, address string, entrypoints []string) (float64, error) {
return 0, nil
}
5 changes: 5 additions & 0 deletions internal/reindexer/transfer/storage.go
Original file line number Diff line number Diff line change
@@ -89,3 +89,8 @@ func (storage *Storage) GetTokenSupply(network, address string, tokenID int64) (
func (storage *Storage) GetTokenVolumeSeries(network, period string, contracts []string, entrypoints []tzip.DAppContract, tokenID uint) ([][]int64, error) {
return nil, nil
}

// GetToken24HoursVolume -
func (storage *Storage) GetToken24HoursVolume(network, contract string, initiators, entrypoints []string, tokenID int64) (float64, error) {
return 0, nil
}