Skip to content

Commit

Permalink
Fix: starting indexer process
Browse files Browse the repository at this point in the history
  • Loading branch information
aopoltorzhicky committed Nov 29, 2021
1 parent 7186e91 commit b663d45
Show file tree
Hide file tree
Showing 4 changed files with 58 additions and 18 deletions.
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ test:
go test ./...

mempool:
cd cmd/mempool && go run . -c ../../build/dipdup.yml
cd cmd/mempool && go run . -c ../../build/dipdup.testnet.yml

local:
docker-compose -f docker-compose.yml up -d --build
2 changes: 1 addition & 1 deletion cmd/mempool/bakers.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import (
func (indexer *Indexer) setEndorsementBakers(ctx context.Context) {
defer indexer.wg.Done()

log.Info("Thread for finding endorsement baker started")
log.WithField("network", indexer.network).Info("Thread for finding endorsement baker started")

ticker := time.NewTicker(time.Second * 10)
defer ticker.Stop()
Expand Down
6 changes: 3 additions & 3 deletions cmd/mempool/indexer.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,15 +50,15 @@ func NewIndexer(ctx context.Context, network string, indexerCfg config.Indexer,
}

rpc := node.NewNodeRPC(indexerCfg.DataSource.RPC[0])
constants, err := rpc.Constants()
constants, err := rpc.Constants(node.WithContext(ctx))
if err != nil {
return nil, err
}
if len(constants.TimeBetweenBlocks) == 0 {
return nil, errors.Errorf("Empty time_between_blocks in node response: %s", network)
}

head, err := rpc.Header("head")
head, err := rpc.Header("head", node.WithContext(ctx))
if err != nil {
return nil, err
}
Expand All @@ -74,7 +74,7 @@ func NewIndexer(ctx context.Context, network string, indexerCfg config.Indexer,

expiredAfter := settings.ExpiredAfter
if expiredAfter == 0 {
metadata, err := rpc.HeadMetadata("head")
metadata, err := rpc.HeadMetadata("head", node.WithContext(ctx))
if err != nil {
return nil, err
}
Expand Down
66 changes: 53 additions & 13 deletions cmd/mempool/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"os/signal"
"strings"
"syscall"
"time"

"github.com/dipdup-net/go-lib/cmdline"
libCfg "github.com/dipdup-net/go-lib/config"
Expand Down Expand Up @@ -38,6 +39,8 @@ func main() {
signal.Notify(signals, os.Interrupt, syscall.SIGTERM, syscall.SIGINT)

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

indexers := make(map[string]*Indexer)

kinds := make(map[string]struct{})
Expand All @@ -49,23 +52,41 @@ func main() {
prometheusService.Start()
}

indexerCancels := make(map[string]context.CancelFunc)
for network, mempool := range cfg.Mempool.Indexers {
for _, kind := range mempool.Filters.Kinds {
kinds[kind] = struct{}{}
}

indexer, err := NewIndexer(ctx, network, *mempool, cfg.Database, cfg.Mempool.Settings, prometheusService)
if err != nil {
log.Error(err)
return
}
indexers[network] = indexer

if err := indexer.Start(ctx); err != nil {
log.Error(err)
return
}
go func(network string, mempool *config.Indexer) {
indexerCancel, err := startIndexer(ctx, network, cfg, mempool, prometheusService)
if err != nil {
log.Error(err)
} else {
indexerCancels[network] = indexerCancel
return
}

ticker := time.NewTicker(time.Minute)
defer ticker.Stop()

for {
select {
case <-ctx.Done():
return
case <-ticker.C:
cancelFunc, err := startIndexer(ctx, network, cfg, mempool, prometheusService)
if err != nil {
log.Error(err)
} else {
indexerCancels[network] = cancelFunc
return
}
}
}
}(network, mempool)
}

views, err := createViews(ctx, cfg.Database)
if err != nil {
log.Error(err)
Expand All @@ -85,9 +106,13 @@ func main() {
}

<-signals
cancel()

log.Warn("Trying carefully stopping....")

for network, indexerCancel := range indexerCancels {
log.Infof("stopping %s indexer...", network)
indexerCancel()
}

for _, indexer := range indexers {
if err := indexer.Close(); err != nil {
log.Error(err)
Expand Down Expand Up @@ -140,3 +165,18 @@ func createViews(ctx context.Context, database libCfg.Database) ([]string, error

return views, nil
}

func startIndexer(ctx context.Context, network string, cfg config.Config, mempool *config.Indexer, prometheusService *prometheus.Service) (context.CancelFunc, error) {
indexerCtx, cancel := context.WithCancel(ctx)
indexer, err := NewIndexer(indexerCtx, network, *mempool, cfg.Database, cfg.Mempool.Settings, prometheusService)
if err != nil {
cancel()
return nil, err
}

if err := indexer.Start(indexerCtx); err != nil {
cancel()
return nil, err
}
return cancel, nil
}

0 comments on commit b663d45

Please sign in to comment.