Skip to content

Commit

Permalink
Minor fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
aopoltorzhicky committed Nov 30, 2021
1 parent 010f4ab commit 94e14cf
Show file tree
Hide file tree
Showing 3 changed files with 49 additions and 39 deletions.
2 changes: 1 addition & 1 deletion .vscode/launch.json
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
"program": "${workspaceFolder}/cmd/mempool",
"args": [
"-c",
"../../build/dipdup.yml"
"../../build/dipdup.testnet.yml"
],
"envFile": "${workspaceFolder}/.env"
}
Expand Down
15 changes: 8 additions & 7 deletions cmd/mempool/indexer.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ func NewIndexer(ctx context.Context, network string, indexerCfg config.Indexer,

// Start -
func (indexer *Indexer) Start(ctx context.Context) error {
indexer.log().WithField("kinds", indexer.filters.Kinds).Info("Starting...")
indexer.log().WithField("kinds", indexer.filters.Kinds).Info("starting...")

if err := indexer.initState(); err != nil {
return err
Expand Down Expand Up @@ -178,7 +178,8 @@ func (indexer *Indexer) initState() error {

// Close -
func (indexer *Indexer) Close() error {
indexer.log().Info("Stopping...")
indexer.log().Info("stopping...")

indexer.wg.Wait()

if err := indexer.tzkt.Close(); err != nil {
Expand All @@ -196,7 +197,7 @@ func (indexer *Indexer) Close() error {
return err
}

indexer.log().Info("Indexer was stopped")
indexer.log().Info("indexer was stopped")
return nil
}

Expand All @@ -222,7 +223,7 @@ func (indexer *Indexer) listen(ctx context.Context) {
case receiver.StatusApplied:
applied, ok := msg.Body.(node.Applied)
if !ok {
indexer.log().Errorf("Invalid applied operation %v", applied)
indexer.log().Errorf("invalid applied operation %v", applied)
continue
}
if indexer.isHashProcessed(applied.Hash) {
Expand All @@ -235,7 +236,7 @@ func (indexer *Indexer) listen(ctx context.Context) {
case receiver.StatusBranchDelayed, receiver.StatusBranchRefused, receiver.StatusRefused, receiver.StatusUnprocessed:
failed, ok := msg.Body.(node.FailedMonitor)
if !ok {
indexer.log().Errorf("Invalid %s operation %v", msg.Status, failed)
indexer.log().Errorf("invalid %s operation %v", msg.Status, failed)
continue
}
if indexer.isHashProcessed(failed.Hash) {
Expand All @@ -246,7 +247,7 @@ func (indexer *Indexer) listen(ctx context.Context) {
continue
}
default:
indexer.log().Errorf("Invalid mempool operation status %s", msg.Status)
indexer.log().Errorf("invalid mempool operation status %s", msg.Status)
}
}
}
Expand All @@ -262,7 +263,7 @@ func (indexer *Indexer) isHashProcessed(hash string) bool {
}

func (indexer *Indexer) onPopBlockQueue(block Block) error {
indexer.log().WithField("level", block.Level).Infof("Operations with branch %s is expired", block.Branch)
indexer.log().WithField("level", block.Level).Infof("operations with branch %s is expired", block.Branch)
return indexer.db.Transaction(func(tx *gorm.DB) error {
return models.SetExpired(tx, indexer.network, block.Branch, indexer.filters.Kinds...)
})
Expand Down
71 changes: 40 additions & 31 deletions cmd/mempool/tzkt/tzkt.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,11 +66,11 @@ func (tzkt *TzKT) Connect(ctx context.Context) error {
return
case msg := <-tzkt.client.Listen():
switch msg.Channel {
case "operations":
case events.ChannelOperations:
if err := tzkt.handleOperationMessage(msg); err != nil {
log.Error(err)
}
case "blocks":
case events.ChannelBlocks:
if err := tzkt.handleBlockMessage(msg); err != nil {
log.Error(err)
}
Expand Down Expand Up @@ -388,17 +388,21 @@ func (tzkt *TzKT) init(ctx context.Context, state syncState, indexerState, headL
msg := newOperationMessage()

for !state.finished() {

for table := state.nextToRequest(); table != nil; table = state.nextToRequest() {
if err := tzkt.getTableData(ctx, table, indexerState, headLevel); err != nil {
return err
select {
case <-ctx.Done():
return ctx.Err()
default:
for table := state.nextToRequest(); table != nil; table = state.nextToRequest() {
if err := tzkt.getTableData(ctx, table, indexerState, headLevel); err != nil {
return err
}
}
}

sort.Sort(state)
sort.Sort(state)

if err := tzkt.processSync(state, &msg); err != nil {
return err
if err := tzkt.processSync(ctx, state, &msg); err != nil {
return err
}
}
}

Expand All @@ -423,30 +427,35 @@ func (tzkt *TzKT) Subscribe() error {
return nil
}

func (tzkt *TzKT) processSync(state syncState, msg *OperationMessage) error {
func (tzkt *TzKT) processSync(ctx context.Context, state syncState, msg *OperationMessage) error {
for len(state[0].Items) > 0 {
table := state[0]

operation := table.Items[0]
msg.Hash.LoadOrStore(operation.Hash, operation)
table.LastID = operation.ID

switch {
case msg.Level == 0:
msg.Level = operation.Level
msg.Block = operation.Block
case msg.Level != operation.Level:
tzkt.blocks <- BlockMessage{
Type: events.MessageTypeData,
Level: msg.Level,
Hash: msg.Block,
select {
case <-ctx.Done():
return ctx.Err()
default:
table := state[0]

operation := table.Items[0]
msg.Hash.LoadOrStore(operation.Hash, operation)
table.LastID = operation.ID

switch {
case msg.Level == 0:
msg.Level = operation.Level
msg.Block = operation.Block
case msg.Level != operation.Level:
tzkt.blocks <- BlockMessage{
Type: events.MessageTypeData,
Level: msg.Level,
Hash: msg.Block,
}
tzkt.operations <- msg.copy()
msg.clear()
}
tzkt.operations <- msg.copy()
msg.clear()
}

table.Items = table.Items[1:]
sort.Sort(state)
table.Items = table.Items[1:]
sort.Sort(state)
}
}

if msg.Level > 0 && state.finished() {
Expand Down

0 comments on commit 94e14cf

Please sign in to comment.