Skip to content

Commit

Permalink
Fixed indexer initialization: added indexer state to indexer channel …
Browse files Browse the repository at this point in the history
…state (#3)
  • Loading branch information
k-karuna authored Sep 2, 2024
1 parent 14ef1d3 commit 48313ee
Showing 1 changed file with 17 additions and 10 deletions.
27 changes: 17 additions & 10 deletions cmd/metadata/indexer.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import (
"sync"
"time"

ipfs "github.com/dipdup-io/ipfs-tools"
"github.com/dipdup-io/ipfs-tools"
"github.com/dipdup-io/starknet-go-api/pkg/data"
"github.com/dipdup-io/starknet-indexer/pkg/grpc"
"github.com/dipdup-io/starknet-indexer/pkg/grpc/pb"
Expand All @@ -28,6 +28,8 @@ const (
OutputName = "output"
)

const IndexerName = "Indexer"

// Indexer -
type Indexer struct {
modules.BaseModule
Expand All @@ -50,7 +52,7 @@ func NewIndexer(cfg Metadata, datasources map[string]config.DataSource, pg postg
indexer := &Indexer{
client: client,
storage: pg,
BaseModule: modules.New("Indexer"),
BaseModule: modules.New(IndexerName),
state: new(models.State),
wg: new(sync.WaitGroup),
}
Expand Down Expand Up @@ -113,16 +115,21 @@ func (indexer *Indexer) Subscribe(ctx context.Context, subscriptions map[string]

func (indexer *Indexer) init(ctx context.Context) error {
state, err := indexer.storage.State.ByName(ctx, indexer.Name())
switch {
case err == nil:
indexer.state = &state
return nil
case indexer.storage.State.IsNoRows(err):
if err != nil {
if !indexer.storage.State.IsNoRows(err) {
return err
}

indexer.state.Name = indexer.Name()
return indexer.storage.State.Save(ctx, indexer.state)
default:
return err
if err := indexer.storage.State.Save(ctx, indexer.state); err != nil {
return err
}
} else {
indexer.state = &state
}

indexer.channel.state = indexer.state
return nil
}

func (indexer *Indexer) listen(ctx context.Context) {
Expand Down

0 comments on commit 48313ee

Please sign in to comment.