diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml index 26ab0a6ff..cdd41d5be 100644 --- a/.github/workflows/build.yml +++ b/.github/workflows/build.yml @@ -83,7 +83,7 @@ jobs: context: . file: build/sandbox/Dockerfile build-args: | - TAG=4.5.4 + TAG=4.5.7 push: true cache-from: type=gha cache-to: type=gha,mode=max diff --git a/.github/workflows/tests_master.yml b/.github/workflows/tests_master.yml index 926772242..f8522deaa 100644 --- a/.github/workflows/tests_master.yml +++ b/.github/workflows/tests_master.yml @@ -11,7 +11,7 @@ jobs: steps: - uses: actions/setup-go@v3 with: - go-version: '1.19' + go-version: '1.20' - uses: actions/checkout@v3 - name: golangci-lint uses: golangci/golangci-lint-action@v3 @@ -25,7 +25,7 @@ jobs: - name: install Go uses: actions/setup-go@v2 with: - go-version: 1.19.x + go-version: 1.20.x - name: checkout code uses: actions/checkout@v2 - uses: actions/cache@v2 diff --git a/.github/workflows/tests_pr.yml b/.github/workflows/tests_pr.yml index 4579e96b7..a8682e9b5 100644 --- a/.github/workflows/tests_pr.yml +++ b/.github/workflows/tests_pr.yml @@ -11,7 +11,7 @@ jobs: steps: - uses: actions/setup-go@v3 with: - go-version: '1.19' + go-version: '1.20' - uses: actions/checkout@v3 - name: golangci-lint uses: golangci/golangci-lint-action@v3 @@ -25,7 +25,7 @@ jobs: - name: install Go uses: actions/setup-go@v2 with: - go-version: 1.19.x + go-version: 1.20.x - name: checkout code uses: actions/checkout@v2 - uses: actions/cache@v2 diff --git a/build/api/Dockerfile b/build/api/Dockerfile index 6ec53f562..996cadf97 100644 --- a/build/api/Dockerfile +++ b/build/api/Dockerfile @@ -1,7 +1,7 @@ # --------------------------------------------------------------------- # The first stage container, for building the application # --------------------------------------------------------------------- -FROM golang:1.19-alpine as builder +FROM golang:1.20-alpine as builder ENV CGO_ENABLED=0 ENV GO111MODULE=on diff --git a/build/indexer/Dockerfile b/build/indexer/Dockerfile index 727191184..e0570d8a5 100644 --- a/build/indexer/Dockerfile +++ b/build/indexer/Dockerfile @@ -1,7 +1,7 @@ # --------------------------------------------------------------------- # The first stage container, for building the application # --------------------------------------------------------------------- -FROM golang:1.19-alpine as builder +FROM golang:1.20-alpine as builder ENV CGO_ENABLED=0 ENV GO111MODULE=on diff --git a/build/sandbox/Dockerfile b/build/sandbox/Dockerfile index d2a3377c3..76067c03c 100644 --- a/build/sandbox/Dockerfile +++ b/build/sandbox/Dockerfile @@ -1,4 +1,4 @@ -FROM node:14 AS build +FROM node:16 AS build ARG TAG RUN git clone --depth=1 --branch ${TAG} https://github.com/baking-bad/bcd.git /bcd diff --git a/cmd/api/handlers/entrypoints.go b/cmd/api/handlers/entrypoints.go index 652d56931..136f2d83d 100644 --- a/cmd/api/handlers/entrypoints.go +++ b/cmd/api/handlers/entrypoints.go @@ -5,6 +5,7 @@ import ( "github.com/baking-bad/bcdhub/internal/bcd" "github.com/baking-bad/bcdhub/internal/bcd/ast" + "github.com/baking-bad/bcdhub/internal/bcd/encoding" "github.com/baking-bad/bcdhub/internal/bcd/formatter" "github.com/baking-bad/bcdhub/internal/bcd/types" "github.com/baking-bad/bcdhub/internal/config" @@ -151,6 +152,18 @@ func GetEntrypointSchema() gin.HandlerFunc { return } + var ( + hash []byte + err error + ) + + if esReq.Hash != "" { + hash, err = encoding.DecodeBase58(esReq.Hash) + if handleError(c, ctx.Storage, err, http.StatusBadRequest) { + return + } + } + symLink, err := getCurrentSymLink(ctx.Blocks) if handleError(c, ctx.Storage, err, 0) { return @@ -207,7 +220,7 @@ func GetEntrypointSchema() gin.HandlerFunc { return } - opg, err := ctx.Operations.GetByHashAndCounter(esReq.Hash, int64(*esReq.Counter)) + opg, err := ctx.Operations.GetByHashAndCounter(hash, int64(*esReq.Counter)) if handleError(c, ctx.Storage, err, 0) { return } diff --git a/cmd/api/handlers/error.go b/cmd/api/handlers/error.go index d4aa24aec..3e4882624 100644 --- a/cmd/api/handlers/error.go +++ b/cmd/api/handlers/error.go @@ -7,6 +7,7 @@ import ( "github.com/baking-bad/bcdhub/internal/bcd/ast" "github.com/baking-bad/bcdhub/internal/logger" "github.com/baking-bad/bcdhub/internal/models" + "github.com/baking-bad/bcdhub/internal/noderpc" sentrygin "github.com/getsentry/sentry-go/gin" "github.com/gin-gonic/gin" jsoniter "github.com/json-iterator/go" @@ -14,6 +15,10 @@ import ( var json = jsoniter.ConfigCompatibleWithStandardLibrary +func skipError(err error) bool { + return errors.Is(err, noderpc.ErrNodeRPCError) +} + func handleError(c *gin.Context, repo models.GeneralRepository, err error, code int) bool { if err == nil { return false @@ -24,12 +29,13 @@ func handleError(c *gin.Context, repo models.GeneralRepository, err error, code err = errors.New("invalid authentication") case 0: code = getErrorCode(err, repo) - if code == http.StatusInternalServerError { + if code == http.StatusInternalServerError && !skipError(err) { if hub := sentrygin.GetHubFromContext(c); hub != nil { hub.CaptureMessage(err.Error()) } - logger.Err(err) } + + logger.Err(err) } c.AbortWithStatusJSON(code, getErrorMessage(err, repo)) diff --git a/cmd/api/handlers/migrations.go b/cmd/api/handlers/migrations.go index d9e497b32..68bfd8555 100644 --- a/cmd/api/handlers/migrations.go +++ b/cmd/api/handlers/migrations.go @@ -3,6 +3,7 @@ package handlers import ( "net/http" + "github.com/baking-bad/bcdhub/internal/bcd/encoding" "github.com/baking-bad/bcdhub/internal/config" "github.com/baking-bad/bcdhub/internal/models/migration" "github.com/gin-gonic/gin" @@ -61,10 +62,14 @@ func prepareMigrations(ctx *config.Context, data []migration.Migration) ([]Migra if err != nil && !ctx.Storage.IsRecordNotFound(err) { return nil, err } + var hash string + if len(data[i].Hash) > 0 { + hash = encoding.MustEncodeOperationHash(data[i].Hash) + } result[i] = Migration{ Level: data[i].Level, Timestamp: data[i].Timestamp, - Hash: data[i].Hash, + Hash: hash, Protocol: proto.Hash, PrevProtocol: prevProto.Hash, Kind: data[i].Kind.String(), diff --git a/cmd/api/handlers/operations.go b/cmd/api/handlers/operations.go index 5aafad551..681e2dbbc 100644 --- a/cmd/api/handlers/operations.go +++ b/cmd/api/handlers/operations.go @@ -7,6 +7,7 @@ import ( "github.com/baking-bad/bcdhub/internal/bcd" "github.com/baking-bad/bcdhub/internal/bcd/ast" "github.com/baking-bad/bcdhub/internal/bcd/consts" + "github.com/baking-bad/bcdhub/internal/bcd/encoding" "github.com/baking-bad/bcdhub/internal/bcd/formatter" formattererror "github.com/baking-bad/bcdhub/internal/bcd/formatter/error" "github.com/baking-bad/bcdhub/internal/bcd/tezerrors" @@ -17,6 +18,7 @@ import ( "github.com/baking-bad/bcdhub/internal/models/operation" modelTypes "github.com/baking-bad/bcdhub/internal/models/types" "github.com/baking-bad/bcdhub/internal/parsers/storage" + "github.com/baking-bad/bcdhub/internal/postgres/core" "github.com/gin-gonic/gin" "github.com/pkg/errors" "github.com/tidwall/gjson" @@ -113,9 +115,14 @@ func GetOperation() gin.HandlerFunc { operations := make([]operation.Operation, 0) var foundContext *config.Context + hash, err := encoding.DecodeBase58(req.Hash) + if handleError(c, any.Storage, err, http.StatusBadRequest) { + return + } + network := modelTypes.NewNetwork(queryReq.Network) if ctx, ok := ctxs[network]; ok { - op, err := ctx.Operations.GetByHash(req.Hash) + op, err := ctx.Operations.GetByHash(hash) if err != nil { if !ctx.Storage.IsRecordNotFound(err) { handleError(c, ctx.Storage, err, 0) @@ -127,7 +134,7 @@ func GetOperation() gin.HandlerFunc { } } else { for _, ctx := range ctxs { - op, err := ctx.Operations.GetByHash(req.Hash) + op, err := ctx.Operations.GetByHash(hash) if err != nil { if !ctx.Storage.IsRecordNotFound(err) { handleError(c, ctx.Storage, err, 0) @@ -383,19 +390,24 @@ func GetByHashAndCounter() gin.HandlerFunc { return } + hash, err := encoding.DecodeBase58(req.Hash) + if handleError(c, ctxs.Any().Storage, err, http.StatusBadRequest) { + return + } + var opg []operation.Operation var foundContext *config.Context ctx, err := ctxs.Get(modelTypes.NewNetwork(args.Network)) if err == nil { - opg, err = ctx.Operations.GetByHashAndCounter(req.Hash, req.Counter) + opg, err = ctx.Operations.GetByHashAndCounter(hash, req.Counter) if handleError(c, ctx.Storage, err, 0) { return } foundContext = ctx } else { for _, ctx := range ctxs { - opg, err = ctx.Operations.GetByHashAndCounter(req.Hash, req.Counter) + opg, err = ctx.Operations.GetByHashAndCounter(hash, req.Counter) if handleError(c, ctx.Storage, err, 0) { return } @@ -608,6 +620,9 @@ func getStorageDiff(ctx *config.Context, destinationID int64, bmd []bigmapdiff.B map[string]interface{}{ "destination_id": destinationID, "status": modelTypes.OperationStatusApplied, + "timestamp": core.TimestampFilter{ + Lt: op.Timestamp, + }, }, op.ID) if err == nil { prevStorage = &ast.TypedAst{ diff --git a/cmd/api/handlers/responses.go b/cmd/api/handlers/responses.go index f65641f3f..f1bb234c2 100644 --- a/cmd/api/handlers/responses.go +++ b/cmd/api/handlers/responses.go @@ -5,6 +5,7 @@ import ( "time" "github.com/baking-bad/bcdhub/internal/bcd/ast" + "github.com/baking-bad/bcdhub/internal/bcd/encoding" "github.com/baking-bad/bcdhub/internal/bcd/formatter" "github.com/baking-bad/bcdhub/internal/bcd/tezerrors" "github.com/baking-bad/bcdhub/internal/models/account" @@ -67,7 +68,9 @@ type Operation struct { // FromModel - func (o *Operation) FromModel(operation operation.Operation) { o.ID = operation.ID - o.Hash = operation.Hash + if len(operation.Hash) > 0 { + o.Hash = encoding.MustEncodeOperationHash(operation.Hash) + } o.Internal = operation.Internal o.Timestamp = operation.Timestamp.UTC() @@ -97,9 +100,13 @@ func (o *Operation) FromModel(operation operation.Operation) { // ToModel - func (o *Operation) ToModel() operation.Operation { + var hash []byte + if o.Hash != "" { + hash = encoding.MustDecodeBase58(o.Hash) + } return operation.Operation{ ID: o.ID, - Hash: o.Hash, + Hash: hash, Internal: o.Internal, Timestamp: o.Timestamp, Level: o.Level, @@ -604,6 +611,10 @@ type OPGResponse struct { // NewOPGResponse - func NewOPGResponse(opg operation.OPG) OPGResponse { + var hash string + if len(opg.Hash) > 0 { + hash = encoding.MustEncodeOperationHash(opg.Hash) + } return OPGResponse{ LastID: opg.LastID, ContentIndex: opg.ContentIndex, @@ -612,7 +623,7 @@ func NewOPGResponse(opg operation.OPG) OPGResponse { TotalCost: opg.TotalCost, Flow: opg.Flow, Internals: opg.Internals, - Hash: opg.Hash, + Hash: hash, Entrypoint: opg.Entrypoint, Timestamp: opg.Timestamp, Status: opg.Status.String(), @@ -636,8 +647,13 @@ func NewEvent(o operation.Operation) (*Event, error) { return nil, nil } + var hash string + if len(o.Hash) > 0 { + hash = encoding.MustEncodeOperationHash(o.Hash) + } + e := &Event{ - Hash: o.Hash, + Hash: hash, Status: o.Status.String(), Timestamp: o.Timestamp, Level: o.Level, diff --git a/cmd/api/handlers/run_code.go b/cmd/api/handlers/run_code.go index 223170755..74a15aee1 100644 --- a/cmd/api/handlers/run_code.go +++ b/cmd/api/handlers/run_code.go @@ -94,7 +94,7 @@ func RunOperation() gin.HandlerFunc { } parser := operations.NewGroup(parserParams) - store := postgres.NewStore(ctx.StorageDB.DB) + store := postgres.NewStore(ctx.StorageDB.DB, ctx.Partitions) if err := parser.Parse(response, store); handleError(c, ctx.Storage, err, 0) { return } @@ -304,7 +304,7 @@ func parseBigMapDiffs(c *gin.Context, ctx *config.Context, response noderpc.RunC }, } - store := postgres.NewStore(ctx.StorageDB.DB) + store := postgres.NewStore(ctx.StorageDB.DB, ctx.Partitions) switch operation.Kind { case types.OperationKindTransaction.String(): err = specific.StorageParser.ParseTransaction(nodeOperation, &model, store) diff --git a/cmd/api/handlers/tickets.go b/cmd/api/handlers/tickets.go index 4faa42e63..d96a07f32 100644 --- a/cmd/api/handlers/tickets.go +++ b/cmd/api/handlers/tickets.go @@ -4,6 +4,7 @@ import ( "net/http" "github.com/baking-bad/bcdhub/internal/bcd/ast" + "github.com/baking-bad/bcdhub/internal/bcd/encoding" "github.com/baking-bad/bcdhub/internal/config" "github.com/gin-gonic/gin" ) @@ -72,7 +73,9 @@ func GetContractTicketUpdates() gin.HandlerFunc { if handleError(c, ctx.Storage, err, 0) { return } - update.OperationHash = operation.Hash + if len(operation.Hash) > 0 { + update.OperationHash = encoding.MustEncodeOperationHash(operation.Hash) + } response = append(response, update) } diff --git a/cmd/api/handlers/views.go b/cmd/api/handlers/views.go index bb63702b3..21d5443ca 100644 --- a/cmd/api/handlers/views.go +++ b/cmd/api/handlers/views.go @@ -2,7 +2,6 @@ package handlers import ( "context" - "errors" "io" "net/http" "time" @@ -15,6 +14,7 @@ import ( "github.com/baking-bad/bcdhub/internal/models/contract" "github.com/baking-bad/bcdhub/internal/views" "github.com/gin-gonic/gin" + "github.com/pkg/errors" ) var ( diff --git a/cmd/indexer/indexer/indexer.go b/cmd/indexer/indexer/indexer.go index 780b47494..2cf75a1ab 100644 --- a/cmd/indexer/indexer/indexer.go +++ b/cmd/indexer/indexer/indexer.go @@ -254,7 +254,7 @@ func (bi *BlockchainIndexer) handleBlock(ctx context.Context, block *Block) erro } } - store := postgres.NewStore(tx) + store := postgres.NewStore(tx, bi.Partitions) if err := bi.implicitMigration(ctx, block, bi.currentProtocol, store); err != nil { return err } @@ -263,7 +263,7 @@ func (bi *BlockchainIndexer) handleBlock(ctx context.Context, block *Block) erro return err } - if err := store.Save(); err != nil { + if err := store.Save(ctx); err != nil { return err } @@ -543,7 +543,7 @@ func (bi *BlockchainIndexer) vestingMigration(ctx context.Context, head noderpc. return err } - store := postgres.NewStore(tx) + store := postgres.NewStore(tx, bi.Partitions) for _, address := range addresses { if !bcd.IsContract(address) { @@ -560,7 +560,7 @@ func (bi *BlockchainIndexer) vestingMigration(ctx context.Context, head noderpc. } } - return store.Save() + return store.Save(ctx) } func (bi *BlockchainIndexer) reinit(ctx context.Context, cfg config.Config, indexerConfig config.IndexerConfig) error { diff --git a/cmd/indexer/indexer/indices.go b/cmd/indexer/indexer/indices.go index 0f546c8bc..7db5a97e8 100644 --- a/cmd/indexer/indexer/indices.go +++ b/cmd/indexer/indexer/indices.go @@ -27,11 +27,11 @@ func createStartIndices(db pg.DBI) error { } // Big map diff - if _, err := db.Model((*bigmapdiff.BigMapDiff)(nil)).Exec(`CREATE INDEX CONCURRENTLY IF NOT EXISTS big_map_diff_idx ON ?TableName (contract, ptr)`); err != nil { + if _, err := db.Model((*bigmapdiff.BigMapDiff)(nil)).Exec(`CREATE INDEX IF NOT EXISTS big_map_diff_idx ON ?TableName (contract, ptr)`); err != nil { return err } - if _, err := db.Model((*bigmapdiff.BigMapDiff)(nil)).Exec(`CREATE INDEX CONCURRENTLY IF NOT EXISTS big_map_diff_key_hash_idx ON ?TableName (key_hash, ptr)`); err != nil { + if _, err := db.Model((*bigmapdiff.BigMapDiff)(nil)).Exec(`CREATE INDEX IF NOT EXISTS big_map_diff_key_hash_idx ON ?TableName (key_hash, ptr)`); err != nil { return err } @@ -54,11 +54,11 @@ func createStartIndices(db pg.DBI) error { } // Operations - if _, err := db.Model((*operation.Operation)(nil)).Exec(`CREATE INDEX CONCURRENTLY IF NOT EXISTS operations_destination_idx ON ?TableName (destination_id)`); err != nil { + if _, err := db.Model((*operation.Operation)(nil)).Exec(`CREATE INDEX IF NOT EXISTS operations_destination_idx ON ?TableName (destination_id)`); err != nil { return err } - if _, err := db.Model((*operation.Operation)(nil)).Exec(`CREATE INDEX CONCURRENTLY IF NOT EXISTS operations_status_idx ON ?TableName (status)`); err != nil { + if _, err := db.Model((*operation.Operation)(nil)).Exec(`CREATE INDEX IF NOT EXISTS operations_status_idx ON ?TableName (status)`); err != nil { return err } @@ -90,19 +90,19 @@ func (bi *BlockchainIndexer) createIndices() { // Big map diff if _, err := bi.Context.StorageDB.DB.Model((*bigmapdiff.BigMapDiff)(nil)).Exec(` - CREATE INDEX CONCURRENTLY IF NOT EXISTS big_map_diff_operation_id_idx ON ?TableName (operation_id) + CREATE INDEX IF NOT EXISTS big_map_diff_operation_id_idx ON ?TableName (operation_id) `); err != nil { logger.Error().Err(err).Msg("can't create index") } if _, err := bi.Context.StorageDB.DB.Model((*bigmapdiff.BigMapDiff)(nil)).Exec(` - CREATE INDEX CONCURRENTLY IF NOT EXISTS big_map_diff_level_idx ON ?TableName (level) + CREATE INDEX IF NOT EXISTS big_map_diff_level_idx ON ?TableName (level) `); err != nil { logger.Error().Err(err).Msg("can't create index") } if _, err := bi.Context.StorageDB.DB.Model((*bigmapdiff.BigMapDiff)(nil)).Exec(` - CREATE INDEX CONCURRENTLY IF NOT EXISTS big_map_diff_protocol_idx ON ?TableName (protocol_id) + CREATE INDEX IF NOT EXISTS big_map_diff_protocol_idx ON ?TableName (protocol_id) `); err != nil { logger.Error().Err(err).Msg("can't create index") } @@ -154,55 +154,67 @@ func (bi *BlockchainIndexer) createIndices() { // Operations if _, err := bi.Context.StorageDB.DB.Model((*operation.Operation)(nil)).Exec(` - CREATE INDEX CONCURRENTLY IF NOT EXISTS operations_level_idx ON ?TableName (level) + CREATE INDEX IF NOT EXISTS operations_level_idx ON ?TableName (level) `); err != nil { logger.Error().Err(err).Msg("can't create index") } if _, err := bi.Context.StorageDB.DB.Model((*operation.Operation)(nil)).Exec(` - CREATE INDEX CONCURRENTLY IF NOT EXISTS operations_source_idx ON ?TableName (source_id) + CREATE INDEX IF NOT EXISTS operations_source_idx ON ?TableName (source_id) `); err != nil { logger.Error().Err(err).Msg("can't create index") } if _, err := bi.Context.StorageDB.DB.Model((*operation.Operation)(nil)).Exec(` - CREATE INDEX CONCURRENTLY IF NOT EXISTS operations_opg_idx ON ?TableName (hash, counter, content_index) + CREATE INDEX IF NOT EXISTS operations_opg_idx ON ?TableName (hash, counter, content_index) `); err != nil { logger.Error().Err(err).Msg("can't create index") } if _, err := bi.Context.StorageDB.DB.Model((*operation.Operation)(nil)).Exec(` - CREATE INDEX CONCURRENTLY IF NOT EXISTS operations_entrypoint_idx ON ?TableName (entrypoint) + CREATE INDEX IF NOT EXISTS operations_entrypoint_idx ON ?TableName (entrypoint) `); err != nil { logger.Error().Err(err).Msg("can't create index") } if _, err := bi.Context.StorageDB.DB.Model((*operation.Operation)(nil)).Exec(` - CREATE INDEX CONCURRENTLY IF NOT EXISTS operations_hash_idx ON ?TableName (hash) + CREATE INDEX IF NOT EXISTS operations_hash_idx ON ?TableName (hash) `); err != nil { logger.Error().Err(err).Msg("can't create index") } if _, err := bi.Context.StorageDB.DB.Model((*operation.Operation)(nil)).Exec(` - CREATE INDEX CONCURRENTLY IF NOT EXISTS operations_opg_with_nonce_idx ON ?TableName (hash, counter, nonce) + CREATE INDEX IF NOT EXISTS operations_opg_with_nonce_idx ON ?TableName (hash, counter, nonce) `); err != nil { logger.Error().Err(err).Msg("can't create index") } if _, err := bi.Context.StorageDB.DB.Model((*operation.Operation)(nil)).Exec(` - CREATE INDEX CONCURRENTLY IF NOT EXISTS operations_sort_idx ON ?TableName (level, counter, id) + CREATE INDEX IF NOT EXISTS operations_sort_idx ON ?TableName (level, counter, id) `); err != nil { logger.Error().Err(err).Msg("can't create index") } if _, err := bi.Context.StorageDB.DB.Model((*operation.Operation)(nil)).Exec(` - CREATE INDEX CONCURRENTLY IF NOT EXISTS operations_timestamp_idx ON ?TableName (timestamp) + CREATE INDEX IF NOT EXISTS operations_timestamp_idx ON ?TableName (timestamp) `); err != nil { logger.Error().Err(err).Msg("can't create index") } if _, err := bi.Context.StorageDB.DB.Model((*operation.Operation)(nil)).Exec(` - CREATE INDEX CONCURRENTLY IF NOT EXISTS operations_kind_idx ON ?TableName (kind) + CREATE INDEX IF NOT EXISTS operations_kind_idx ON ?TableName (kind) + `); err != nil { + logger.Error().Err(err).Msg("can't create index") + } + + if _, err := bi.Context.StorageDB.DB.Model((*operation.Operation)(nil)).Exec(` + CREATE INDEX IF NOT EXISTS operations_destination_timestamp_idx ON ?TableName (destination_id, timestamp) + `); err != nil { + logger.Error().Err(err).Msg("can't create index") + } + + if _, err := bi.Context.StorageDB.DB.Model((*operation.Operation)(nil)).Exec(` + CREATE INDEX IF NOT EXISTS operations_source_timestamp_idx ON ?TableName (source_id, timestamp) `); err != nil { logger.Error().Err(err).Msg("can't create index") } diff --git a/configs/development.yml b/configs/development.yml index d39803963..fc7d0c049 100644 --- a/configs/development.yml +++ b/configs/development.yml @@ -33,6 +33,7 @@ storage: password: ${POSTGRES_PASSWORD} sslmode: disable timeout: 10 + log_queries: ${POSTGRES_LOG_QUERIES:-false} sentry: environment: development @@ -71,10 +72,10 @@ indexer: project_name: indexer sentry_enabled: false networks: - # mainnet: - # receiver_threads: 5 - # ghostnet: - # receiver_threads: 10 + mainnet: + receiver_threads: 5 + ghostnet: + receiver_threads: 10 nairobinet: receiver_threads: 10 connections: diff --git a/configs/production.yml b/configs/production.yml index d40b4a0a8..f92aa5f3d 100644 --- a/configs/production.yml +++ b/configs/production.yml @@ -34,6 +34,7 @@ storage: password: ${POSTGRES_PASSWORD} sslmode: disable timeout: 10 + log_queries: ${POSTGRES_LOG_QUERIES:-false} sentry: environment: production diff --git a/docker-compose.yml b/docker-compose.yml index b6e75f6d5..53e1e43f9 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -13,7 +13,7 @@ services: ports: - 127.0.0.1:${POSTGRES_PORT}:5432 healthcheck: - test: ["CMD-SHELL", "pg_isready -U postgres"] + test: ["CMD-SHELL", "pg_isready -U ${POSTGRES_USER}"] interval: 10s timeout: 5s retries: 5 diff --git a/internal/bcd/ast/map.go b/internal/bcd/ast/map.go index 4ed6b4546..f9d0f1e11 100644 --- a/internal/bcd/ast/map.go +++ b/internal/bcd/ast/map.go @@ -461,8 +461,8 @@ func (m *Map) Range(handler func(node Node) error) error { return m.ValueType.Range(handler) } -// GetDefaultJSONModel - -func (m *Map) GetDefaultJSONModel(model JSONModel) { +// GetJSONModel - +func (m *Map) GetJSONModel(model JSONModel) { if model == nil { return } diff --git a/internal/bcd/encoding/base58.go b/internal/bcd/encoding/base58.go index 5ce1cfa90..d6d1a937e 100644 --- a/internal/bcd/encoding/base58.go +++ b/internal/bcd/encoding/base58.go @@ -177,6 +177,20 @@ func DecodeBase58ToString(data string) (string, error) { return hex.EncodeToString(decoded[len(enc.DecodedPrefix):]), nil } +// MustDecodeBase58 - +func MustDecodeBase58(data string) []byte { + decoded, err := base58Enc.CheckDecode(data) + if err != nil { + panic(err) + } + enc, err := getBase58EncodingForDecode(decoded) + if err != nil { + panic(err) + } + + return decoded[len(enc.DecodedPrefix):] +} + // DecodeBase58String - func DecodeBase58String(data string) (string, error) { b, err := DecodeBase58(data) @@ -203,3 +217,17 @@ func EncodeBase58String(data string, prefix []byte) (string, error) { } return EncodeBase58(b, prefix) } + +// MustEncodeBase58 - +func MustEncodeBase58(data, prefix []byte) string { + enc, err := getBase58EncodingForEncode(data, prefix) + if err != nil { + panic(err) + } + return base58Enc.CheckEncode(append(enc.DecodedPrefix, data...)) +} + +// MustEncodeOperationHash - +func MustEncodeOperationHash(data []byte) string { + return MustEncodeBase58(data, []byte(PrefixOperationHash)) +} diff --git a/internal/config/config.go b/internal/config/config.go index 67d5b5c22..82a9c5d32 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -72,8 +72,9 @@ type ServiceConfig struct { // StorageConfig - type StorageConfig struct { - Postgres PostgresConfig `yaml:"pg"` - Timeout int `yaml:"timeout"` + Postgres PostgresConfig `yaml:"pg"` + Timeout int `yaml:"timeout"` + LogQueries bool `yaml:"log_queries,omitempty"` } // PostgresConfig - diff --git a/internal/config/context.go b/internal/config/context.go index 3e17d96e5..18ac661c4 100644 --- a/internal/config/context.go +++ b/internal/config/context.go @@ -16,6 +16,7 @@ import ( "github.com/baking-bad/bcdhub/internal/models/ticket" "github.com/baking-bad/bcdhub/internal/models/types" "github.com/baking-bad/bcdhub/internal/noderpc" + "github.com/baking-bad/bcdhub/internal/postgres" "github.com/baking-bad/bcdhub/internal/postgres/core" "github.com/baking-bad/bcdhub/internal/services/mempool" "github.com/microcosm-cc/bluemonday" @@ -46,6 +47,7 @@ type Context struct { TicketUpdates ticket.Repository Domains domains.Repository Scripts contract.ScriptRepository + Partitions *postgres.PartitionManager Cache *cache.Cache Sanitizer *bluemonday.Policy diff --git a/internal/config/options.go b/internal/config/options.go index 71dc2141a..0668b7840 100644 --- a/internal/config/options.go +++ b/internal/config/options.go @@ -4,6 +4,7 @@ import ( "time" "github.com/baking-bad/bcdhub/internal/bcd/tezerrors" + "github.com/baking-bad/bcdhub/internal/postgres" "github.com/baking-bad/bcdhub/internal/postgres/account" "github.com/baking-bad/bcdhub/internal/postgres/bigmapdiff" "github.com/baking-bad/bcdhub/internal/postgres/contract" @@ -49,13 +50,19 @@ func WithStorage(cfg StorageConfig, appName string, maxPageSize int64, maxConnCo panic("Please set connection strings to storage in config") } - conn := pgCore.WaitNew( - cfg.Postgres.ConnectionString(), ctx.Network.String(), - appName, cfg.Timeout, + opts := []pgCore.PostgresOption{ pgCore.WithPageSize(maxPageSize), pgCore.WithIdleConnections(idleConnCount), pgCore.WithMaxConnections(maxConnCount), - // pgCore.WithQueryLogging(), + } + + if cfg.LogQueries { + opts = append(opts, pgCore.WithQueryLogging()) + } + + conn := pgCore.WaitNew( + cfg.Postgres.ConnectionString(), ctx.Network.String(), + appName, cfg.Timeout, opts..., ) contractStorage := contract.NewStorage(conn) @@ -73,6 +80,7 @@ func WithStorage(cfg StorageConfig, appName string, maxPageSize int64, maxConnCo ctx.Domains = domains.NewStorage(conn) ctx.TicketUpdates = ticket.NewStorage(conn) ctx.Scripts = contractStorage + ctx.Partitions = postgres.NewPartitionManager(conn) } } diff --git a/internal/helpers/quarters.go b/internal/helpers/quarters.go new file mode 100644 index 000000000..07575b317 --- /dev/null +++ b/internal/helpers/quarters.go @@ -0,0 +1,38 @@ +package helpers + +import ( + "errors" + "time" +) + +// QuarterOf - +func QuarterOf(month time.Month) int { + return (int(month) + 2) / 3 +} + +// QuarterBoundaries - +func QuarterBoundaries(current time.Time) (time.Time, time.Time, error) { + year := current.Year() + quarter := QuarterOf(current.Month()) + + switch quarter { + case 1: + start := time.Date(year, time.January, 1, 0, 0, 0, 0, time.UTC) + end := start.AddDate(0, 3, 0) + return start, end, nil + case 2: + start := time.Date(year, time.April, 1, 0, 0, 0, 0, time.UTC) + end := start.AddDate(0, 3, 0) + return start, end, nil + case 3: + start := time.Date(year, time.July, 1, 0, 0, 0, 0, time.UTC) + end := start.AddDate(0, 3, 0) + return start, end, nil + case 4: + start := time.Date(year, time.October, 1, 0, 0, 0, 0, time.UTC) + end := start.AddDate(0, 3, 0) + return start, end, nil + } + + return time.Now(), time.Now(), errors.New("invalid quarter") +} diff --git a/internal/models/bigmapdiff/model.go b/internal/models/bigmapdiff/model.go index d43e68a19..d1cf4f737 100644 --- a/internal/models/bigmapdiff/model.go +++ b/internal/models/bigmapdiff/model.go @@ -10,17 +10,17 @@ import ( // BigMapDiff - type BigMapDiff struct { // nolint - tableName struct{} `pg:"big_map_diffs"` + tableName struct{} `pg:"big_map_diffs,partition_by:RANGE(timestamp)"` - ID int64 + ID int64 `pg:",pk"` Ptr int64 `pg:",use_zero"` Key types.Bytes `pg:",notnull,type:bytea"` KeyHash string Value types.Bytes `pg:",type:bytea"` Level int64 Contract string - Timestamp time.Time - ProtocolID int64 `pg:",type:SMALLINT"` + Timestamp time.Time `pg:",pk"` + ProtocolID int64 `pg:",type:SMALLINT"` OperationID int64 } @@ -37,7 +37,7 @@ func (b *BigMapDiff) GetIndex() string { // Save - func (b *BigMapDiff) Save(tx pg.DBI) error { _, err := tx.Model(b). - OnConflict("(id) DO UPDATE"). + OnConflict("(id, timestamp) DO UPDATE"). Set(` ptr = excluded.ptr, key = excluded.key, diff --git a/internal/models/migration/model.go b/internal/models/migration/model.go index e74d376e2..d79cae752 100644 --- a/internal/models/migration/model.go +++ b/internal/models/migration/model.go @@ -16,7 +16,7 @@ type Migration struct { ID int64 ProtocolID int64 `pg:",type:SMALLINT"` PrevProtocolID int64 - Hash string + Hash []byte Timestamp time.Time Level int64 Kind types.MigrationKind `pg:",type:SMALLINT"` diff --git a/internal/models/mock/operation/mock.go b/internal/models/mock/operation/mock.go index b0122b4f7..8c29ccd72 100644 --- a/internal/models/mock/operation/mock.go +++ b/internal/models/mock/operation/mock.go @@ -65,7 +65,7 @@ func (mr *MockRepositoryMockRecorder) Last(filter, lastID interface{}) *gomock.C } // GetByHash mocks base method -func (m *MockRepository) GetByHash(hash string) ([]model.Operation, error) { +func (m *MockRepository) GetByHash(hash []byte) ([]model.Operation, error) { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "GetByHash", hash) ret0, _ := ret[0].([]model.Operation) @@ -80,7 +80,7 @@ func (mr *MockRepositoryMockRecorder) GetByHash(hash interface{}) *gomock.Call { } // GetByHashAndCounter mocks base method -func (m *MockRepository) GetByHashAndCounter(hash string, counter int64) ([]model.Operation, error) { +func (m *MockRepository) GetByHashAndCounter(hash []byte, counter int64) ([]model.Operation, error) { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "GetByHashAndCounter", hash, counter) ret0, _ := ret[0].([]model.Operation) diff --git a/internal/models/operation/data.go b/internal/models/operation/data.go index 5c4bb5102..147fd4225 100644 --- a/internal/models/operation/data.go +++ b/internal/models/operation/data.go @@ -22,7 +22,7 @@ type OPG struct { TotalCost int64 Flow int64 Internals int - Hash string + Hash []byte Entrypoint string Timestamp time.Time Status types.OperationStatus diff --git a/internal/models/operation/model.go b/internal/models/operation/model.go index fd756cb8c..7e36fbf4d 100644 --- a/internal/models/operation/model.go +++ b/internal/models/operation/model.go @@ -1,6 +1,7 @@ package operation import ( + "encoding/hex" "time" "github.com/baking-bad/bcdhub/internal/bcd" @@ -18,9 +19,9 @@ import ( // Operation - type Operation struct { // nolint - tableName struct{} `pg:"operations"` + tableName struct{} `pg:"operations,partition_by:RANGE(timestamp)"` - ID int64 + ID int64 `pg:",pk"` ContentIndex int64 `pg:",use_zero"` Level int64 `pg:",use_zero"` Counter int64 `pg:",use_zero"` @@ -46,13 +47,13 @@ type Operation struct { DelegateID int64 Delegate account.Account `pg:",rel:has-one"` - Timestamp time.Time + Timestamp time.Time `pg:",pk,notnull"` Status types.OperationStatus `pg:",type:SMALLINT"` Kind types.OperationKind `pg:",type:SMALLINT"` Entrypoint types.NullString `pg:",type:text"` Tag types.NullString `pg:",type:text"` - Hash string + Hash []byte Parameters []byte DeffatedStorage []byte EventPayload []byte @@ -90,7 +91,7 @@ func (o *Operation) Save(tx pg.DBI) error { // LogFields - func (o *Operation) LogFields() map[string]interface{} { return map[string]interface{}{ - "hash": o.Hash, + "hash": hex.EncodeToString(o.Hash), "block": o.Level, } } @@ -136,7 +137,7 @@ func (o *Operation) IsTransaction() bool { // IsImplicit - func (o *Operation) IsImplicit() bool { - return o.Hash == "" + return len(o.Hash) == 0 } // IsApplied - diff --git a/internal/models/operation/repository.go b/internal/models/operation/repository.go index 53c23ca4c..bc8484f3f 100644 --- a/internal/models/operation/repository.go +++ b/internal/models/operation/repository.go @@ -9,8 +9,8 @@ type Repository interface { GetByAccount(acc account.Account, size uint64, filters map[string]interface{}) (Pageable, error) // Last - get last operation by `filters` with not empty deffated_storage. Last(filter map[string]interface{}, lastID int64) (Operation, error) - GetByHash(hash string) ([]Operation, error) - GetByHashAndCounter(hash string, counter int64) ([]Operation, error) + GetByHash(hash []byte) ([]Operation, error) + GetByHashAndCounter(hash []byte, counter int64) ([]Operation, error) GetImplicitOperation(counter int64) (Operation, error) OPG(address string, size, lastID int64) ([]OPG, error) Origination(accountID int64) (Operation, error) diff --git a/internal/noderpc/errors.go b/internal/noderpc/errors.go index 8bbb16116..65254a5b6 100644 --- a/internal/noderpc/errors.go +++ b/internal/noderpc/errors.go @@ -94,5 +94,5 @@ func (e InvalidNodeResponse) Is(target error) bool { // Errors var ( ErrInvalidStatusCode = errors.New("invalid status code") - ErrNodeRPCError = "Node RPC error" + ErrNodeRPCError = errors.New("Node RPC error") ) diff --git a/internal/noderpc/pool.go b/internal/noderpc/pool.go index b0c36ddae..762cce50a 100644 --- a/internal/noderpc/pool.go +++ b/internal/noderpc/pool.go @@ -58,7 +58,6 @@ func NewWaitPool(urls []string, opts ...NodeOption) Pool { } func (p Pool) getNode() (*poolItem, error) { - rand.Seed(time.Now().UnixNano()) nodes := make([]*poolItem, 0) for i := range p { // if p[i].isBlocked() { diff --git a/internal/noderpc/rpc.go b/internal/noderpc/rpc.go index bf40c80d4..0f8240b47 100644 --- a/internal/noderpc/rpc.go +++ b/internal/noderpc/rpc.go @@ -112,7 +112,7 @@ func (rpc *NodeRPC) checkStatusCode(resp *http.Response, checkStatusCode bool) e func (rpc *NodeRPC) parseResponse(resp *http.Response, checkStatusCode bool, uri string, response interface{}) error { if err := rpc.checkStatusCode(resp, checkStatusCode); err != nil { - return errors.Wrapf(err, "%s (%s)", ErrNodeRPCError, uri) + return fmt.Errorf("%w (%s): %w", ErrNodeRPCError, uri, err) } return json.NewDecoder(resp.Body).Decode(response) @@ -185,7 +185,7 @@ func (rpc *NodeRPC) getRaw(ctx context.Context, uri string) ([]byte, error) { defer resp.Body.Close() if err := rpc.checkStatusCode(resp, true); err != nil { - return nil, errors.Wrapf(err, "%s (%s)", ErrNodeRPCError, uri) + return nil, fmt.Errorf("%w (%s): %w", ErrNodeRPCError, uri, err) } return io.ReadAll(resp.Body) } @@ -198,7 +198,7 @@ func (rpc *NodeRPC) post(ctx context.Context, uri string, data interface{}, chec } defer resp.Body.Close() - return rpc.parseResponse(resp, checkStatusCode, "", response) + return rpc.parseResponse(resp, checkStatusCode, uri, response) } // Block - returns block diff --git a/internal/parsers/operations/migration_test.go b/internal/parsers/operations/migration_test.go index df5eb91ab..a3981df7d 100644 --- a/internal/parsers/operations/migration_test.go +++ b/internal/parsers/operations/migration_test.go @@ -38,7 +38,7 @@ func TestMigration_Parse(t *testing.T) { Address: "destination", }, Timestamp: timestamp, - Hash: "hash", + Hash: []byte("hash"), }, fileName: "./data/migration/test1.json", want: nil, @@ -51,14 +51,14 @@ func TestMigration_Parse(t *testing.T) { Address: "destination", }, Timestamp: timestamp, - Hash: "hash", + Hash: []byte("hash"), }, fileName: "./data/migration/test2.json", want: &migration.Migration{ Level: 123, ProtocolID: 2, Timestamp: timestamp, - Hash: "hash", + Hash: []byte("hash"), Kind: types.MigrationKindLambda, }, }, diff --git a/internal/parsers/operations/operation_group.go b/internal/parsers/operations/operation_group.go index 254402e01..952b5d9e2 100644 --- a/internal/parsers/operations/operation_group.go +++ b/internal/parsers/operations/operation_group.go @@ -3,6 +3,7 @@ package operations import ( "github.com/baking-bad/bcdhub/internal/bcd" "github.com/baking-bad/bcdhub/internal/bcd/consts" + "github.com/baking-bad/bcdhub/internal/bcd/encoding" "github.com/baking-bad/bcdhub/internal/helpers" "github.com/baking-bad/bcdhub/internal/noderpc" "github.com/baking-bad/bcdhub/internal/parsers" @@ -20,8 +21,14 @@ func NewGroup(params *ParseParams) Group { // Parse - func (opg Group) Parse(data noderpc.LightOperationGroup, store parsers.Store) error { - opg.hash = data.Hash - helpers.SetTagSentry("hash", opg.hash) + helpers.SetTagSentry("hash", data.Hash) + if data.Hash != "" { + hash, err := encoding.DecodeBase58(data.Hash) + if err != nil { + return err + } + opg.hash = hash + } for idx, item := range data.Contents { opg.contentIdx = int64(idx) diff --git a/internal/parsers/operations/operation_group_test.go b/internal/parsers/operations/operation_group_test.go index 6dd03b3e9..f6cf5b186 100644 --- a/internal/parsers/operations/operation_group_test.go +++ b/internal/parsers/operations/operation_group_test.go @@ -8,6 +8,7 @@ import ( "time" "github.com/baking-bad/bcdhub/internal/bcd" + "github.com/baking-bad/bcdhub/internal/bcd/encoding" "github.com/baking-bad/bcdhub/internal/cache" "github.com/baking-bad/bcdhub/internal/config" "github.com/baking-bad/bcdhub/internal/models" @@ -469,7 +470,7 @@ func TestGroup_Parse(t *testing.T) { Delegate: account.Account{}, Status: types.OperationStatusApplied, Level: 1068669, - Hash: "opJXaAMkBrAbd1XFd23kS8vXiw63tU4rLUcLrZgqUCpCbhT1Pn9", + Hash: encoding.MustDecodeBase58("opJXaAMkBrAbd1XFd23kS8vXiw63tU4rLUcLrZgqUCpCbhT1Pn9"), Entrypoint: types.NullString{ Str: "transfer", Valid: true, @@ -519,7 +520,7 @@ func TestGroup_Parse(t *testing.T) { Status: types.OperationStatusApplied, Level: 1068669, Counter: 5791164, - Hash: "opJXaAMkBrAbd1XFd23kS8vXiw63tU4rLUcLrZgqUCpCbhT1Pn9", + Hash: encoding.MustDecodeBase58("opJXaAMkBrAbd1XFd23kS8vXiw63tU4rLUcLrZgqUCpCbhT1Pn9"), Nonce: getInt64Pointer(0), Entrypoint: types.NullString{ Str: "validateAccounts", @@ -548,7 +549,7 @@ func TestGroup_Parse(t *testing.T) { Status: types.OperationStatusApplied, Level: 1068669, Counter: 5791164, - Hash: "opJXaAMkBrAbd1XFd23kS8vXiw63tU4rLUcLrZgqUCpCbhT1Pn9", + Hash: encoding.MustDecodeBase58("opJXaAMkBrAbd1XFd23kS8vXiw63tU4rLUcLrZgqUCpCbhT1Pn9"), Nonce: getInt64Pointer(1), Entrypoint: types.NullString{ Str: "validateRules", @@ -630,7 +631,7 @@ func TestGroup_Parse(t *testing.T) { { ContentIndex: 0, ProtocolID: 1, - Hash: "opPUPCpQu6pP38z9TkgFfwLiqVBFGSWQCH8Z2PUL3jrpxqJH5gt", + Hash: encoding.MustDecodeBase58("opPUPCpQu6pP38z9TkgFfwLiqVBFGSWQCH8Z2PUL3jrpxqJH5gt"), Internal: false, Nonce: nil, Status: types.OperationStatusApplied, @@ -674,7 +675,7 @@ func TestGroup_Parse(t *testing.T) { }, { ContentIndex: 0, ProtocolID: 1, - Hash: "opPUPCpQu6pP38z9TkgFfwLiqVBFGSWQCH8Z2PUL3jrpxqJH5gt", + Hash: encoding.MustDecodeBase58("opPUPCpQu6pP38z9TkgFfwLiqVBFGSWQCH8Z2PUL3jrpxqJH5gt"), Internal: true, Nonce: getInt64Pointer(0), Status: types.OperationStatusApplied, @@ -814,7 +815,7 @@ func TestGroup_Parse(t *testing.T) { { ContentIndex: 0, ProtocolID: 0, - Hash: "onzUDQhwunz2yqzfEsoURXEBz9p7Gk8DgY4QBva52Z4b3AJCZjt", + Hash: encoding.MustDecodeBase58("onzUDQhwunz2yqzfEsoURXEBz9p7Gk8DgY4QBva52Z4b3AJCZjt"), Internal: false, Status: types.OperationStatusApplied, Timestamp: timestamp, @@ -928,7 +929,7 @@ func TestGroup_Parse(t *testing.T) { Status: types.OperationStatusApplied, Level: 301436, - Hash: "onv6Q1dNejAGEJeQzwRannWsDSGw85FuFdhLnBrY18TBcC9p8kC", + Hash: encoding.MustDecodeBase58("onv6Q1dNejAGEJeQzwRannWsDSGw85FuFdhLnBrY18TBcC9p8kC"), Timestamp: timestamp, Burned: 331000, Initiator: account.Account{ @@ -1027,7 +1028,7 @@ func TestGroup_Parse(t *testing.T) { }, Status: types.OperationStatusApplied, Level: 72207, - Hash: "op4fFMvYsxvSUKZmLWC7aUf25VMYqigaDwTZCAoBBi8zACbHTNg", + Hash: encoding.MustDecodeBase58("op4fFMvYsxvSUKZmLWC7aUf25VMYqigaDwTZCAoBBi8zACbHTNg"), Timestamp: timestamp, Entrypoint: types.NullString{ Str: "@entrypoint_1", @@ -1081,7 +1082,7 @@ func TestGroup_Parse(t *testing.T) { }, Status: types.OperationStatusApplied, Level: 72207, - Hash: "op4fFMvYsxvSUKZmLWC7aUf25VMYqigaDwTZCAoBBi8zACbHTNg", + Hash: encoding.MustDecodeBase58("op4fFMvYsxvSUKZmLWC7aUf25VMYqigaDwTZCAoBBi8zACbHTNg"), Timestamp: timestamp, Burned: 5245000, Counter: 155670, @@ -1212,7 +1213,7 @@ func TestGroup_Parse(t *testing.T) { Status: types.OperationStatusApplied, Level: 1516349, - Hash: "ooz1bkCQeYsZYP7vb4Dx7pYPRpWN11Z3G3yP1v4HAfdNXuHRv9c", + Hash: encoding.MustDecodeBase58("ooz1bkCQeYsZYP7vb4Dx7pYPRpWN11Z3G3yP1v4HAfdNXuHRv9c"), Timestamp: timestamp, Entrypoint: types.NullString{ Str: "transfer", @@ -1309,7 +1310,7 @@ func TestGroup_Parse(t *testing.T) { Operations: []*operation.Operation{ { Kind: types.OperationKindTransaction, - Hash: "oocFt4vkkgQGfoRH54328cJUbDdWvj3x6KEs5Arm4XhqwwJmnJ8", + Hash: encoding.MustDecodeBase58("oocFt4vkkgQGfoRH54328cJUbDdWvj3x6KEs5Arm4XhqwwJmnJ8"), Source: account.Account{ Address: "tz1WKygtstVY96oyc6Rmk945dMf33LeihgWT", @@ -1345,7 +1346,7 @@ func TestGroup_Parse(t *testing.T) { }, { Kind: types.OperationKindTransaction, - Hash: "oocFt4vkkgQGfoRH54328cJUbDdWvj3x6KEs5Arm4XhqwwJmnJ8", + Hash: encoding.MustDecodeBase58("oocFt4vkkgQGfoRH54328cJUbDdWvj3x6KEs5Arm4XhqwwJmnJ8"), Internal: true, Timestamp: timestamp, Status: types.OperationStatusApplied, @@ -1427,7 +1428,7 @@ func TestGroup_Parse(t *testing.T) { Operations: []*operation.Operation{ { Kind: types.OperationKindRegisterGlobalConstant, - Hash: "ooffKPL6WmMgqzLGtRtLp2HdEbVL3K2fVzKQLyxsBFMC84wpjRt", + Hash: encoding.MustDecodeBase58("ooffKPL6WmMgqzLGtRtLp2HdEbVL3K2fVzKQLyxsBFMC84wpjRt"), Source: account.Account{ Address: "tz1SMARcpWCydHsGgz4MRoK9NkbpBmmUAfNe", Type: types.AccountTypeTz, @@ -1499,7 +1500,7 @@ func TestGroup_Parse(t *testing.T) { Operations: []*operation.Operation{ { Kind: types.OperationKindTransaction, - Hash: "oozvzXiZmVW9QtYjKmDuYqoHNCEvt32FwM2cUgQee2S1SGWgumA", + Hash: encoding.MustDecodeBase58("oozvzXiZmVW9QtYjKmDuYqoHNCEvt32FwM2cUgQee2S1SGWgumA"), Source: account.Account{ Address: "tz1RiUE3Ao53juAz4uDYx1J3tHJMye6jPfhp", Type: types.AccountTypeTz, @@ -1541,7 +1542,7 @@ func TestGroup_Parse(t *testing.T) { }, }, { Kind: types.OperationKindTransaction, - Hash: "oozvzXiZmVW9QtYjKmDuYqoHNCEvt32FwM2cUgQee2S1SGWgumA", + Hash: encoding.MustDecodeBase58("oozvzXiZmVW9QtYjKmDuYqoHNCEvt32FwM2cUgQee2S1SGWgumA"), Source: account.Account{ Address: "KT1Jk8LRDoj6LkopYZwRq5ZEWBhYv8nVc6e6", Type: types.AccountTypeContract, @@ -1587,7 +1588,7 @@ func TestGroup_Parse(t *testing.T) { }, }, { Kind: types.OperationKindTransaction, - Hash: "oozvzXiZmVW9QtYjKmDuYqoHNCEvt32FwM2cUgQee2S1SGWgumA", + Hash: encoding.MustDecodeBase58("oozvzXiZmVW9QtYjKmDuYqoHNCEvt32FwM2cUgQee2S1SGWgumA"), Source: account.Account{ Address: "KT1Jk8LRDoj6LkopYZwRq5ZEWBhYv8nVc6e6", Type: types.AccountTypeContract, @@ -1637,7 +1638,7 @@ func TestGroup_Parse(t *testing.T) { }, }, { Kind: types.OperationKindOrigination, - Hash: "oozvzXiZmVW9QtYjKmDuYqoHNCEvt32FwM2cUgQee2S1SGWgumA", + Hash: encoding.MustDecodeBase58("oozvzXiZmVW9QtYjKmDuYqoHNCEvt32FwM2cUgQee2S1SGWgumA"), Source: account.Account{ Address: "KT1Jk8LRDoj6LkopYZwRq5ZEWBhYv8nVc6e6", Type: types.AccountTypeContract, @@ -2119,7 +2120,7 @@ func TestGroup_Parse(t *testing.T) { }, Status: types.OperationStatusApplied, Level: 707452, - Hash: "opPDkVe1nU5xqLyoWYQ2r6H7PaJM5S4Pe4WtTmEE7UMQAwfnuiJ", + Hash: encoding.MustDecodeBase58("opPDkVe1nU5xqLyoWYQ2r6H7PaJM5S4Pe4WtTmEE7UMQAwfnuiJ"), Timestamp: timestamp, Initiator: account.Account{ Address: "tz1VSUr8wwNhLAzempoch5d6hLRiTh8Cjcjb", diff --git a/internal/parsers/operations/params.go b/internal/parsers/operations/params.go index a520dd00b..e3f6de0a2 100644 --- a/internal/parsers/operations/params.go +++ b/internal/parsers/operations/params.go @@ -19,7 +19,7 @@ type ParseParams struct { stackTrace *stacktrace.StackTrace - hash string + hash []byte head noderpc.Header contentIdx int64 main *operation.Operation @@ -39,7 +39,7 @@ func WithProtocol(protocol *protocol.Protocol) ParseParamsOption { } // WithHash - -func WithHash(hash string) ParseParamsOption { +func WithHash(hash []byte) ParseParamsOption { return func(dp *ParseParams) { dp.hash = hash } diff --git a/internal/parsers/operations/test_common.go b/internal/parsers/operations/test_common.go index 77cc5c77c..4699ae600 100644 --- a/internal/parsers/operations/test_common.go +++ b/internal/parsers/operations/test_common.go @@ -223,7 +223,7 @@ func compareOperations(t *testing.T, one, two *operation.Operation) bool { logger.Info().Msgf("Protocol: %d != %d", one.ProtocolID, two.ProtocolID) return false } - if one.Hash != two.Hash { + if !bytes.Equal(one.Hash, two.Hash) { logger.Info().Msgf("Hash: %s != %s", one.Hash, two.Hash) return false } diff --git a/internal/parsers/store.go b/internal/parsers/store.go index 5a6426c08..9b939119a 100644 --- a/internal/parsers/store.go +++ b/internal/parsers/store.go @@ -1,6 +1,8 @@ package parsers import ( + "context" + "github.com/baking-bad/bcdhub/internal/models/bigmapdiff" "github.com/baking-bad/bcdhub/internal/models/contract" "github.com/baking-bad/bcdhub/internal/models/migration" @@ -16,7 +18,7 @@ type Store interface { AddGlobalConstants(constants ...*contract.GlobalConstant) ListContracts() []*contract.Contract ListOperations() []*operation.Operation - Save() error + Save(ctx context.Context) error } // TestStore - @@ -75,6 +77,6 @@ func (store *TestStore) ListOperations() []*operation.Operation { } // Save - -func (store *TestStore) Save() error { +func (store *TestStore) Save(ctx context.Context) error { return nil } diff --git a/internal/postgres/bigmapdiff/storage.go b/internal/postgres/bigmapdiff/storage.go index f4ada3901..3cfa07f09 100644 --- a/internal/postgres/bigmapdiff/storage.go +++ b/internal/postgres/bigmapdiff/storage.go @@ -158,38 +158,34 @@ func (storage *Storage) Get(ctx bigmapdiff.GetContext) ([]bigmapdiff.Bucket, err // GetStats - func (storage *Storage) GetStats(ptr int64) (stats bigmapdiff.Stats, err error) { - totalQuery := storage.DB.Model().Table(models.DocBigMapState). - ColumnExpr("count(contract) as count, contract, 'total' as name"). + total, err := storage.DB.Model((*bigmapdiff.BigMapState)(nil)). Where("ptr = ?", ptr). - Group("contract") + Count() + if err != nil { + return stats, err + } - activeQuery := storage.DB.Model().Table(models.DocBigMapState). - ColumnExpr("count(contract) as count, contract, 'active' as name"). + active, err := storage.DB.Model((*bigmapdiff.BigMapState)(nil)). Where("ptr = ?", ptr). Where("removed = false"). - Group("contract") - - type row struct { - Count int64 - Contract string - Name string - } - var rows []row - - if _, err = storage.DB.Model().Query(&rows, "(?) union all (?)", totalQuery, activeQuery); err != nil { - return + Count() + if err != nil { + return stats, err } - for i := range rows { - switch rows[i].Name { - case "active": - stats.Active = rows[i].Count - case "total": - stats.Total = rows[i].Count + if err := storage.DB.Model((*bigmapdiff.BigMapState)(nil)). + Column("contract"). + Where("ptr = ?", ptr). + Limit(1). + Select(&stats.Contract); err != nil { + if !storage.IsRecordNotFound(err) { + return stats, err } - stats.Contract = rows[i].Contract } + stats.Active = int64(active) + stats.Total = int64(total) + return } diff --git a/internal/postgres/core/scopes.go b/internal/postgres/core/scopes.go index 59d7a0812..8ec09858c 100644 --- a/internal/postgres/core/scopes.go +++ b/internal/postgres/core/scopes.go @@ -1,6 +1,8 @@ package core import ( + "time" + "github.com/go-pg/pg/v10/orm" ) @@ -40,3 +42,33 @@ func Token(contract string, tokenID uint64) func(db *orm.Query) *orm.Query { var EmptyRelation = func(q *orm.Query) (*orm.Query, error) { return q, nil } + +// TimestampFilter - +type TimestampFilter struct { + Gt time.Time + Gte time.Time + Lt time.Time + Lte time.Time +} + +// Apply - +func (tf TimestampFilter) Apply(q *orm.Query) *orm.Query { + if q == nil { + return q + } + + if !tf.Gt.IsZero() { + q = q.Where("timestamp > ?", tf.Gt) + } + if !tf.Gte.IsZero() { + q = q.Where("timestamp >- ?", tf.Gte) + } + if !tf.Lt.IsZero() { + q = q.Where("timestamp < ?", tf.Lt) + } + if !tf.Lte.IsZero() { + q = q.Where("timestamp <= ?", tf.Lte) + } + + return q +} diff --git a/internal/postgres/operation/storage.go b/internal/postgres/operation/storage.go index b0e715992..9ab7f1029 100644 --- a/internal/postgres/operation/storage.go +++ b/internal/postgres/operation/storage.go @@ -4,8 +4,12 @@ import ( "fmt" "time" + "github.com/baking-bad/bcdhub/internal/bcd" + "github.com/baking-bad/bcdhub/internal/bcd/consts" + "github.com/baking-bad/bcdhub/internal/helpers" "github.com/baking-bad/bcdhub/internal/models" "github.com/baking-bad/bcdhub/internal/models/account" + "github.com/baking-bad/bcdhub/internal/models/contract" "github.com/baking-bad/bcdhub/internal/models/operation" "github.com/baking-bad/bcdhub/internal/models/types" "github.com/baking-bad/bcdhub/internal/postgres/core" @@ -122,32 +126,72 @@ func (storage *Storage) GetByAccount(acc account.Account, size uint64, filters m // Last - get last operation by `filters` with not empty deffated_storage func (storage *Storage) Last(filters map[string]interface{}, lastID int64) (operation.Operation, error) { - query := storage.DB.Model((*operation.Operation)(nil)).Where("deffated_storage is not null").OrderExpr("operation.id desc") + var ( + current = time.Now() + endTime = consts.BeginningOfTime + ) + + if val, ok := filters["timestamp"]; ok { + if tf, ok := val.(core.TimestampFilter); ok { + switch { + case !tf.Lt.IsZero(): + current = tf.Lt + case !tf.Lte.IsZero(): + current = tf.Lte + } - for key, value := range filters { - query.Where("? = ?", pg.Ident(key), value) + switch { + case !tf.Gt.IsZero(): + endTime = tf.Gt + case !tf.Gte.IsZero(): + endTime = tf.Gte + } + } } - if lastID > 0 { - query.Where("operation.id < ?", lastID) - } + for current.After(endTime) { + query := storage.DB.Model((*operation.Operation)(nil)). + Where("deffated_storage is not null"). + OrderExpr("operation.id desc") - query.Limit(2) // It's a hack to avoid postgres "optimization". Limit = 1 is extremely slow. + for key, value := range filters { + switch val := value.(type) { + case core.TimestampFilter: + query = val.Apply(query) + default: + query.Where("? = ?", pg.Ident(key), value) + } + } - var ops []operation.Operation - if err := storage.DB.Model().TableExpr("(?) as operation", query). - ColumnExpr("operation.*"). - ColumnExpr("source.address as source__address"). - ColumnExpr("destination.address as destination__address"). - Join("LEFT JOIN accounts as source ON source.id = operation.source_id"). - Join("LEFT JOIN accounts as destination ON destination.id = operation.destination_id"). - Select(&ops); err != nil { - return operation.Operation{}, err - } - if len(ops) == 0 { - return operation.Operation{}, pg.ErrNoRows + lowCurrent := current.AddDate(0, -3, 0) + query. + Where("timestamp >= ?", lowCurrent). + Where("timestamp < ?", current) + + if lastID > 0 { + query.Where("operation.id < ?", lastID) + } + + query.Limit(1) + + var ops []operation.Operation + if err := storage.DB.Model().TableExpr("(?) as operation", query). + ColumnExpr("operation.*"). + ColumnExpr("source.address as source__address"). + ColumnExpr("destination.address as destination__address"). + Join("LEFT JOIN accounts as source ON source.id = operation.source_id"). + Join("LEFT JOIN accounts as destination ON destination.id = operation.destination_id"). + Select(&ops); err != nil { + return operation.Operation{}, err + } + if len(ops) > 0 { + return ops[0], nil + } + + current = lowCurrent } - return ops[0], nil + + return operation.Operation{}, pg.ErrNoRows } // Get - @@ -171,7 +215,7 @@ func (storage *Storage) Get(filters map[string]interface{}, size int64, sort boo } // GetByHash - -func (storage *Storage) GetByHash(hash string) (operations []operation.Operation, err error) { +func (storage *Storage) GetByHash(hash []byte) (operations []operation.Operation, err error) { query := storage.DB.Model((*operation.Operation)(nil)).Where("hash = ?", hash) addOperationSorting(query) err = storage.DB.Model().TableExpr("(?) as operation", query). @@ -232,52 +276,124 @@ func (storage *Storage) OPG(address string, size, lastID int64) ([]operation.OPG return nil, err } - limit := storage.GetPageSize(size) - - subQuery := storage.DB.Model(new(operation.Operation)). - Column("id", "hash", "counter", "status", "kind"). - WhereGroup( - func(q *orm.Query) (*orm.Query, error) { - return q.Where("destination_id = ?", accountID).WhereOr("source_id = ?", accountID), nil - }, - ). - Order("id desc"). - Limit(325) + var ( + end bool + result = make([]operation.OPG, 0) + lastAction = time.Now().UTC() + limit = storage.GetPageSize(size) + ) + lastActionSet := false if lastID > 0 { - subQuery.Where("id < ?", lastID) + op, err := storage.GetByID(lastID) + if err != nil { + if !storage.IsRecordNotFound(err) { + return nil, err + } + } else { + lastAction = op.Timestamp + lastActionSet = true + } + } + if !lastActionSet && bcd.IsContractLazy(address) { + if err := storage.DB.Model((*contract.Contract)(nil)). + Column("last_action"). + Where("account_id = ?", accountID). + Select(&lastAction); err != nil { + return nil, err + } } - var opg []operation.OPG - _, err := storage.DB.Query(&opg, ` + for !end { + startTime, endTime, err := helpers.QuarterBoundaries(lastAction) + if err != nil { + return nil, err + } + + subQuery := storage.DB.Model(new(operation.Operation)). + Column("id", "hash", "counter", "status", "kind", "level", "timestamp", "content_index", "entrypoint"). + WhereGroup( + func(q *orm.Query) (*orm.Query, error) { + return q.Where("destination_id = ?", accountID).WhereOr("source_id = ?", accountID), nil + }, + ). + Where("timestamp < ?", endTime). + Where("timestamp >= ?", startTime). + Order("id desc"). + Limit(1000) + + if lastID > 0 { + subQuery.Where("id < ?", lastID) + } + + var opg []operation.OPG + if _, err := storage.DB.Query(&opg, ` + with opg as (?0) select ta.last_id, ta.status, ta.counter, ta.kind, - (select sum(case when source_id = ? then -"amount" else "amount" end) as "flow" - from operations - where hash = ta.hash and counter = ta.counter) as "flow", + ta.hash, + ta.level, + ta.timestamp, + ta.entrypoint, + ta.content_index, + (select sum(case when source_id = ?1 then -"amount" else "amount" end) as "flow" + from operations + where hash = ta.hash and counter = ta.counter and (timestamp < ?4) and (timestamp >= ?3) + ) as "flow", (select sum(internal::integer) as internals - from operations - where hash = ta.hash and counter = ta.counter), + from operations + where hash = ta.hash and counter = ta.counter and (timestamp < ?4) and (timestamp >= ?3) + ), (select sum("burned") + sum("fee") as total_cost - from operations - where hash = ta.hash and counter = ta.counter), - ta.hash, operations.level, operations.timestamp, operations.entrypoint, operations.content_index from ( - select min(id) as last_id, hash, counter, max(status) as status, min(kind) as kind from (?) as t + from operations + where hash = ta.hash and counter = ta.counter and (timestamp < ?4) and (timestamp >= ?3) + ) + from ( + select + min(id) as last_id, + hash, + counter, + max(status) as status, + min(kind) as kind, + min(level) as level, + min(timestamp) as timestamp, + min(content_index) as content_index, + string_agg(entrypoint, ',') as entrypoint + from opg group by hash, counter order by last_id desc - limit ? + limit ?2 ) as ta - join operations on operations.id = ta.last_id order by last_id desc - `, accountID, subQuery, limit) - return opg, err + `, subQuery, accountID, limit, startTime, endTime); err != nil { + return nil, err + } + + count := int(size) - len(result) + if count < len(opg) { + opg = opg[:count] + } + + result = append(result, opg...) + + if len(result) < limit { + lastAction = lastAction.AddDate(0, -3, 0) + if lastAction.Before(consts.BeginningOfTime) { + break + } + } + + end = len(result) == limit + } + + return result, nil } // GetByHashAndCounter - -func (storage *Storage) GetByHashAndCounter(hash string, counter int64) ([]operation.Operation, error) { +func (storage *Storage) GetByHashAndCounter(hash []byte, counter int64) ([]operation.Operation, error) { var operations []operation.Operation err := storage.DB.Model((*operation.Operation)(nil)). Where("hash = ?", hash). @@ -328,19 +444,6 @@ func (storage *Storage) EventsCount(accountID int64) (int, error) { Where("kind = 7").Count() } -const cteContractStatsTemplate = `with last_operations as ( - SELECT timestamp - FROM operations AS "operation" - WHERE ((destination_id = ?) OR (source_id = ?)) - order by timestamp desc - FETCH NEXT 20 ROWS ONLY -) -select max(timestamp) as timestamp from last_operations` - -type lastTimestampResult struct { - Timestamp time.Time `pg:"timestamp"` -} - // ContractStats - func (storage *Storage) ContractStats(address string) (stats operation.ContractStats, err error) { var accountID int64 @@ -351,11 +454,41 @@ func (storage *Storage) ContractStats(address string) (stats operation.ContractS return } - var result lastTimestampResult - if _, err := storage.DB.QueryOne(&result, cteContractStatsTemplate, accountID, accountID); err != nil { - return stats, err + if bcd.IsContractLazy(address) { + if err := storage.DB.Model((*contract.Contract)(nil)). + Column("last_action"). + Where("account_id = ?", accountID). + Select(&stats.LastAction); err != nil { + return stats, err + } + } else { + if err := storage.DB.Model((*operation.Operation)(nil)). + Column("timestamp"). + Where("destination_id = ?", accountID). + Order("timestamp desc"). + Limit(1). + Select(&stats.LastAction); err != nil { + if !storage.IsRecordNotFound(err) { + return stats, err + } + } + + var sourceLastAction time.Time + if err := storage.DB.Model((*operation.Operation)(nil)). + Column("timestamp"). + Where("source_id = ?", accountID). + Order("timestamp desc"). + Limit(1). + Select(&sourceLastAction); err != nil { + if !storage.IsRecordNotFound(err) { + return stats, err + } + } + + if sourceLastAction.After(stats.LastAction) { + stats.LastAction = sourceLastAction + } } - stats.LastAction = result.Timestamp count, err := storage.DB.Model((*operation.Operation)(nil)).WhereGroup( func(q *orm.Query) (*orm.Query, error) { diff --git a/internal/postgres/partition_manager.go b/internal/postgres/partition_manager.go new file mode 100644 index 000000000..ebf6ef5d9 --- /dev/null +++ b/internal/postgres/partition_manager.go @@ -0,0 +1,67 @@ +package postgres + +import ( + "context" + "fmt" + "time" + + "github.com/baking-bad/bcdhub/internal/helpers" + "github.com/baking-bad/bcdhub/internal/models" + "github.com/baking-bad/bcdhub/internal/models/bigmapdiff" + "github.com/baking-bad/bcdhub/internal/models/operation" + "github.com/baking-bad/bcdhub/internal/postgres/core" + "github.com/go-pg/pg/v10" +) + +// PartitionManager - +type PartitionManager struct { + conn *core.Postgres + + lastId string +} + +// NewPartitionManager - +func NewPartitionManager(conn *core.Postgres) *PartitionManager { + return &PartitionManager{ + conn: conn, + } +} + +const createPartitionTemplate = `CREATE TABLE IF NOT EXISTS ? PARTITION OF ? FOR VALUES FROM (?) TO (?);` + +func (pm *PartitionManager) partitionId(currentTime time.Time) string { + return fmt.Sprintf("%dQ%d", currentTime.Year(), helpers.QuarterOf(currentTime.Month())) +} + +// CreatePartitions - +func (pm *PartitionManager) CreatePartitions(ctx context.Context, currentTime time.Time) error { + id := pm.partitionId(currentTime) + if id == pm.lastId { + return nil + } + + start, end, err := helpers.QuarterBoundaries(currentTime) + if err != nil { + return err + } + + for _, model := range []models.Model{ + &operation.Operation{}, + &bigmapdiff.BigMapDiff{}, + } { + partitionName := fmt.Sprintf("%s_%s", model.GetIndex(), id) + if _, err := pm.conn.DB.ExecContext( + ctx, + createPartitionTemplate, + pg.Ident(partitionName), + pg.Ident(model.GetIndex()), + start.Format(time.RFC3339Nano), + end.Format(time.RFC3339Nano), + ); err != nil { + return err + } + } + + pm.lastId = id + return nil +} diff --git a/internal/postgres/store.go b/internal/postgres/store.go index 708e39e22..48078642c 100644 --- a/internal/postgres/store.go +++ b/internal/postgres/store.go @@ -1,6 +1,7 @@ package postgres import ( + "context" "time" "github.com/baking-bad/bcdhub/internal/models/bigmapdiff" @@ -20,11 +21,12 @@ type Store struct { Operations []*operation.Operation GlobalConstants []*contract.GlobalConstant - tx pg.DBI + partitions *PartitionManager + tx pg.DBI } // NewStore - -func NewStore(tx pg.DBI) *Store { +func NewStore(tx pg.DBI, pm *PartitionManager) *Store { return &Store{ BigMapState: make([]*bigmapdiff.BigMapState, 0), Contracts: make([]*contract.Contract, 0), @@ -32,7 +34,8 @@ func NewStore(tx pg.DBI) *Store { Operations: make([]*operation.Operation, 0), GlobalConstants: make([]*contract.GlobalConstant, 0), - tx: tx, + partitions: pm, + tx: tx, } } @@ -72,8 +75,8 @@ func (store *Store) ListOperations() []*operation.Operation { } // Save - -func (store *Store) Save() error { - if err := store.saveOperations(store.tx); err != nil { +func (store *Store) Save(ctx context.Context) error { + if err := store.saveOperations(ctx, store.tx); err != nil { return err } @@ -115,7 +118,7 @@ func (store *Store) saveMigrations(tx pg.DBI) error { return err } -func (store *Store) saveOperations(tx pg.DBI) error { +func (store *Store) saveOperations(ctx context.Context, tx pg.DBI) error { if len(store.Operations) == 0 { return nil } @@ -147,6 +150,10 @@ func (store *Store) saveOperations(tx pg.DBI) error { } } + if err := store.partitions.CreatePartitions(ctx, store.Operations[0].Timestamp); err != nil { + return err + } + if _, err := tx.Model(&store.Operations).Returning("id").Insert(); err != nil { return err }