Skip to content

Commit

Permalink
Merge pull request #23 from dipdup-io/refactoring/update-go-lib
Browse files Browse the repository at this point in the history
Refactoring: pg to bun
  • Loading branch information
aopoltorzhicky authored Oct 14, 2023
2 parents a56ae14 + 901a6a4 commit 7730bdb
Show file tree
Hide file tree
Showing 57 changed files with 966 additions and 968 deletions.
47 changes: 29 additions & 18 deletions .github/workflows/test.yml
Original file line number Diff line number Diff line change
@@ -1,38 +1,49 @@
name: Tests
on: push
on:
push:

name: Tests
jobs:
lint:
name: lint
golangci:
name: Linter
runs-on: ubuntu-latest
steps:
- uses: actions/setup-go@v3
- name: Checkout code
uses: actions/checkout@v4

- name: Set up Go
uses: actions/setup-go@v4
with:
go-version: '1.19'
- uses: actions/checkout@v3
go-version: '1.21.x'
cache: false

- name: golangci-lint
uses: golangci/golangci-lint-action@v3
with:
version: v1.51.2
args: --timeout=3m
version: v1.54.1
args: --timeout=5m
test:
name: Test
runs-on: ubuntu-latest
steps:
- name: install Go
uses: actions/setup-go@v2
- name: Checkout code
uses: actions/checkout@v4

- name: Install Go
uses: actions/setup-go@v4
with:
go-version: 1.19.x
- name: checkout code
uses: actions/checkout@v2
- uses: actions/cache@v2
go-version: 1.21.x
- uses: actions/cache@v3
with:
path: ~/go/pkg/mod
path: |
~/.cache/go-build
~/go/pkg/mod
key: ${{ runner.os }}-go-${{ hashFiles('**/go.sum') }}
restore-keys: |
${{ runner.os }}-go-
- name: golang tests
- name: Golang tests
env:
GO111MODULE: on
run: |
go mod download
go test ./...
go test -p 8 ./...
4 changes: 3 additions & 1 deletion .golangci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -11,4 +11,6 @@ linters:
- ineffassign
- containedctx
- tenv
- musttag
- musttag
- mirror
- tagalign
3 changes: 1 addition & 2 deletions build/dipdup.ghostnet.yml
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,7 @@ mempool:
- smart_rollup_cement
datasources:
tzkt: ghostnet_tzkt
rpc:
- ghostnet_rpc
rpc: ghostnet_rpc

database:
kind: postgres
Expand Down
3 changes: 1 addition & 2 deletions build/dipdup.mainnet.yml
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,7 @@ mempool:
- smart_rollup_cement
datasources:
tzkt: mainnet_tzkt
rpc:
- mainnet_rpc
rpc: mainnet_rpc

database:
kind: postgres
Expand Down
9 changes: 3 additions & 6 deletions build/dipdup.testnet.yml
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,7 @@ mempool:
- smart_rollup_cement
datasources:
tzkt: ghostnet_tzkt
rpc:
- ghostnet_rpc
rpc: ghostnet_rpc

nairobinet:
filters:
Expand Down Expand Up @@ -92,8 +91,7 @@ mempool:
- smart_rollup_cement
datasources:
tzkt: nairobinet_tzkt
rpc:
- nairobinet_rpc
rpc: nairobinet_rpc

# oxfordnet:
# filters:
Expand Down Expand Up @@ -136,8 +134,7 @@ mempool:
# - smart_rollup_cement
# datasources:
# tzkt: oxfordnet_tzkt
# rpc:
# - oxfordnet_rpc
# rpc: oxfordnet_rpc

database:
kind: postgres
Expand Down
8 changes: 5 additions & 3 deletions cmd/mempool/bakers.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,6 @@ import (
const unknownBaker = "unknown"

func (indexer *Indexer) setEndorsementBakers(ctx context.Context) {
defer indexer.wg.Done()

indexer.info().Msg("Thread for finding endorsement baker started")

for {
Expand All @@ -35,7 +33,11 @@ func (indexer *Indexer) setEndorsementBakers(ctx context.Context) {
} else {
endorsement.Baker = unknownBaker
}
if _, err := indexer.db.DB().Model(endorsement).WherePK().Update("baker", endorsement.Baker); err != nil {
if _, err := indexer.db.DB().NewUpdate().
Model(endorsement).
WherePK().
Set("baker = ?", endorsement.Baker).
Exec(ctx); err != nil {
log.Err(err).Msg("set baker to endorsement")
}
}
Expand Down
6 changes: 3 additions & 3 deletions cmd/mempool/block_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,12 +29,12 @@ func fromMessage(block tzkt.BlockMessage) Block {
type BlockQueue struct {
queue []Block
levels map[string]uint64
onPop func(block Block) error
onPop func(ctx context.Context, block Block) error
onRollback func(ctx context.Context, block Block) error
capacity uint64
}

func newBlockQueue(capacity uint64, onPop func(block Block) error, onRollback func(ctx context.Context, block Block) error) *BlockQueue {
func newBlockQueue(capacity uint64, onPop func(ctx context.Context, block Block) error, onRollback func(ctx context.Context, block Block) error) *BlockQueue {
if capacity == 0 {
capacity = 60
}
Expand Down Expand Up @@ -66,7 +66,7 @@ func (bq *BlockQueue) Add(ctx context.Context, block tzkt.BlockMessage) error {
item := bq.queue[0]
bq.queue = bq.queue[1:]
if bq.onPop != nil {
if err := bq.onPop(item); err != nil {
if err := bq.onPop(ctx, item); err != nil {
return err
}
}
Expand Down
26 changes: 14 additions & 12 deletions cmd/mempool/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ import (
"context"
"sync"
"time"

"github.com/dipdup-io/workerpool"
)

// Cache -
Expand All @@ -13,7 +15,7 @@ type Cache struct {
ticker *time.Ticker
ttl time.Duration

wg sync.WaitGroup
g workerpool.Group
}

// NewCache -
Expand All @@ -22,6 +24,7 @@ func NewCache(ttl time.Duration) *Cache {
lookup: make(map[string]int64),
ttl: ttl,
ticker: time.NewTicker(time.Minute),
g: workerpool.NewGroup(),
}
}

Expand All @@ -43,31 +46,30 @@ func (c *Cache) Set(key string) {

// Start -
func (c *Cache) Start(ctx context.Context) {
c.wg.Add(1)
go c.checkExpiration(ctx)
c.g.GoCtx(ctx, c.checkExpiration)
}

func (c *Cache) checkExpiration(ctx context.Context) {
defer c.wg.Done()
func (c *Cache) Close() error {
c.g.Wait()
c.ticker.Stop()
return nil
}

func (c *Cache) checkExpiration(ctx context.Context) {
for {
select {
case <-ctx.Done():
c.ticker.Stop()
return

case <-c.ticker.C:
c.mux.RLock()
c.mux.Lock()
for key, expiration := range c.lookup {
if time.Now().UnixNano() <= expiration {
continue
}
c.mux.RUnlock()
c.mux.Lock()
delete(c.lookup, key)
c.mux.Unlock()
c.mux.RLock()
}
c.mux.RUnlock()
c.mux.Unlock()
}
}
}
35 changes: 17 additions & 18 deletions cmd/mempool/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,26 +8,26 @@ import (
// Config
type Config struct {
config.Config `yaml:",inline"`
Mempool Mempool `yaml:"mempool" validate:"required"`
Mempool Mempool `validate:"required" yaml:"mempool"`
Profiler *profiler.Config `yaml:"profiler,omitempty"`
}

// Mempool -
type Mempool struct {
Indexers map[string]*Indexer `yaml:"indexers" validate:"required"`
Settings Settings `yaml:"settings" validate:"required"`
Indexers map[string]*Indexer `validate:"required" yaml:"indexers"`
Settings Settings `validate:"required" yaml:"settings"`
}

// Indexer -
type Indexer struct {
Filters Filters `yaml:"filters" validate:"required"`
DataSource MempoolDataSource `yaml:"datasources" validate:"required"`
Filters Filters `validate:"required" yaml:"filters"`
DataSource MempoolDataSource `validate:"required" yaml:"datasources"`
}

// Filters -
type Filters struct {
Accounts []*config.Alias[config.Contract] `yaml:"accounts" validate:"max=50"`
Kinds []string `yaml:"kinds" validate:"required,min=1,dive,oneof=activate_account ballot delegation double_baking_evidence double_endorsement_evidence endorsement endorsement_with_slot origination proposals reveal seed_nonce_revelation transaction register_global_constant"`
Accounts []*config.Alias[config.Contract] `validate:"max=50" yaml:"accounts"`
Kinds []string `validate:"required,min=1,dive,oneof=activate_account ballot delegation double_baking_evidence double_endorsement_evidence endorsement endorsement_with_slot origination proposals reveal seed_nonce_revelation transaction register_global_constant" yaml:"kinds"`
}

// Addresses -
Expand All @@ -41,23 +41,22 @@ func (f Filters) Addresses() []string {

// MempoolDataSource -
type MempoolDataSource struct {
Tzkt *config.Alias[config.DataSource] `yaml:"tzkt" validate:"required,url"`
RPC []*config.Alias[config.DataSource] `yaml:"rpc" validate:"required,min=1,dive,url"`
Tzkt *config.Alias[config.DataSource] `validate:"required,url" yaml:"tzkt"`
RPC *config.Alias[config.DataSource] `validate:"required,min=1,dive,url" yaml:"rpc"`
}

// URLs -
func (ds MempoolDataSource) URLs() []string {
urls := make([]string, 0)
for i := range ds.RPC {
urls = append(urls, ds.RPC[i].Struct().URL)
func (ds MempoolDataSource) URL() string {
if ds.RPC == nil {
return ""
}
return urls
return ds.RPC.Struct().URL
}

// Settings -
type Settings struct {
KeepOperations uint64 `yaml:"keep_operations_seconds" validate:"required,min=1"`
ExpiredAfter uint64 `yaml:"expired_after_blocks" validate:"required,min=1"`
KeepInChainBlocks uint64 `yaml:"keep_in_chain_blocks" validate:"required,min=1"`
GasStatsLifetime uint64 `yaml:"gas_stats_lifetime" validate:"required,min=1"`
KeepOperations uint64 `validate:"required,min=1" yaml:"keep_operations_seconds"`
ExpiredAfter uint64 `validate:"required,min=1" yaml:"expired_after_blocks"`
KeepInChainBlocks uint64 `validate:"required,min=1" yaml:"keep_in_chain_blocks"`
GasStatsLifetime uint64 `validate:"required,min=1" yaml:"gas_stats_lifetime"`
}
17 changes: 8 additions & 9 deletions cmd/mempool/config/substitutions.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,15 +36,14 @@ func substituteDataSources(c *Config, dataSource *MempoolDataSource) error {
dataSource.Tzkt.SetStruct(source)
}

for i, link := range dataSource.RPC {
source, ok := c.DataSources[link.Name()]
if !ok {
continue
}
if source.Kind != DataSourceKindNode {
return errors.Errorf("Invalid RPC data source kind. Expected `tezos-node`, got `%s`", source.Kind)
}
dataSource.RPC[i].SetStruct(source)
source, ok := c.DataSources[dataSource.RPC.Name()]
if !ok {
return errors.Errorf("invalid rpc datasource: %s", dataSource.RPC.Name())
}
if source.Kind != DataSourceKindNode {
return errors.Errorf("Invalid RPC data source kind. Expected `tezos-node`, got `%s`", source.Kind)
}
dataSource.RPC.SetStruct(source)

return nil
}
Loading

0 comments on commit 7730bdb

Please sign in to comment.