Skip to content

Commit

Permalink
Make component configurations public
Browse files Browse the repository at this point in the history
  • Loading branch information
SamuelSarle committed May 9, 2023
1 parent 2a74ec6 commit 04a7fec
Show file tree
Hide file tree
Showing 6 changed files with 61 additions and 53 deletions.
19 changes: 8 additions & 11 deletions db/badger/blockHandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import (
"errors"
"fmt"

dbh "github.com/aurora-is-near/relayer2-base/db"
"github.com/aurora-is-near/relayer2-base/db/badger/core"
"github.com/aurora-is-near/relayer2-base/db/badger/core/dbkey"
"github.com/aurora-is-near/relayer2-base/db/codec"
Expand All @@ -20,23 +19,23 @@ import (
)

type BlockHandler struct {
Config *Config
db *core.DB
config *Config
}

func NewBlockHandler() (dbh.BlockHandler, error) {
func NewBlockHandler() (*BlockHandler, error) {
return NewBlockHandlerWithCodec(codec.NewTinypackCodec())
}

func NewBlockHandlerWithCodec(codec codec.Codec) (dbh.BlockHandler, error) {
func NewBlockHandlerWithCodec(codec codec.Codec) (*BlockHandler, error) {
config := GetConfig()
db, err := core.NewDB(config.Core, codec)
if err != nil {
return nil, err
}
return &BlockHandler{
db: db,
config: config,
Config: config,
}, nil
}

Expand Down Expand Up @@ -307,7 +306,6 @@ func (h *BlockHandler) GetFilterLogs(ctx context.Context, filter *dbt.LogFilter)
}

func (h *BlockHandler) GetFilterChanges(ctx context.Context, filter any) (*[]interface{}, error) {

var err error
filterChanges := make([]interface{}, 0)
if bf, ok := filter.(*dbt.BlockFilter); ok {
Expand Down Expand Up @@ -408,7 +406,6 @@ func (h *BlockHandler) BlockNumberToHash(ctx context.Context, number common.BN64
}

func (h *BlockHandler) InsertBlock(block *indexer.Block) error {

writer := h.db.NewWriter()
defer writer.Cancel()

Expand Down Expand Up @@ -499,18 +496,18 @@ func (h *BlockHandler) getLogs(ctx context.Context, txn *core.ViewTxn, filter *d
} else {
limit := int(100_000) // Max limit
// If the block range is higher than ScanRangeThreshold, then limit the maximum logs in the response to MaxScanIterators
if to.BlockHeight-from.BlockHeight > uint64(h.config.Core.ScanRangeThreshold) {
limit = int(h.config.Core.MaxScanIterators)
if to.BlockHeight-from.BlockHeight > uint64(h.Config.Core.ScanRangeThreshold) {
limit = int(h.Config.Core.MaxScanIterators)
}
resp, lastKey, err = txn.ReadLogs(ctx, chainId, from, to, addresses, topics, limit)
if err != nil {
if err == core.ErrLimited {
var err error
if limit == int(h.config.Core.MaxScanIterators) {
if limit == int(h.Config.Core.MaxScanIterators) {
err = &errs.LogResponseRangeLimitError{
Err: fmt.Errorf("Log response size exceeded. You can make eth_getLogs requests with "+
"up to a %d block range, or you can request any block range with a cap of %d logs in the response.",
int(h.config.Core.ScanRangeThreshold), int(h.config.Core.MaxScanIterators)),
int(h.Config.Core.ScanRangeThreshold), int(h.Config.Core.MaxScanIterators)),
}
} else {
err = &errs.LogResponseRangeLimitError{
Expand Down
11 changes: 5 additions & 6 deletions db/badger/filterHandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ package badger
import (
"context"
"errors"
dbh "github.com/aurora-is-near/relayer2-base/db"

"github.com/aurora-is-near/relayer2-base/db/badger/core"
"github.com/aurora-is-near/relayer2-base/db/codec"
dbt "github.com/aurora-is-near/relayer2-base/types/db"
Expand All @@ -12,23 +12,23 @@ import (
)

type FilterHandler struct {
Config *Config
db *core.DB
config *Config
}

func NewFilterHandler() (dbh.FilterHandler, error) {
func NewFilterHandler() (*FilterHandler, error) {
return NewFilterHandlerWithCodec(codec.NewTinypackCodec())
}

func NewFilterHandlerWithCodec(codec codec.Codec) (dbh.FilterHandler, error) {
func NewFilterHandlerWithCodec(codec codec.Codec) (*FilterHandler, error) {
config := GetConfig()
db, err := core.NewDB(config.Core, codec)
if err != nil {
return nil, err
}
return &FilterHandler{
db: db,
config: config,
Config: config,
}, nil
}

Expand Down Expand Up @@ -126,7 +126,6 @@ func (h *FilterHandler) DeleteFilter(ctx context.Context, filterId primitives.Da
}
}
return errors.New("filter not found")

})
}

Expand Down
35 changes: 20 additions & 15 deletions indexer/prehistory/indexer.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,29 +3,28 @@ package prehistory
import (
"context"
"errors"
"fmt"
"sync"
"time"

"github.com/jackc/pgx/v5"
"github.com/jackc/pgx/v5/pgtype"
"github.com/jackc/pgx/v5/pgxpool"

"github.com/aurora-is-near/relayer2-base/db"
"github.com/aurora-is-near/relayer2-base/log"
"github.com/aurora-is-near/relayer2-base/types/indexer"
"github.com/aurora-is-near/relayer2-base/types/primitives"
"github.com/aurora-is-near/relayer2-base/utils"

"fmt"

"github.com/jackc/pgx/v5"
"github.com/jackc/pgx/v5/pgtype"
"github.com/jackc/pgx/v5/pgxpool"
)

const (
blankHash = "0x0000000000000000000000000000000000000000000000000000000000000000"
)

type Indexer struct {
Config *Config
dbh db.Handler
config *Config
logger *log.Logger
lock sync.Mutex
started bool
Expand Down Expand Up @@ -53,7 +52,7 @@ func New(dbh db.Handler) (*Indexer, error) {
logger := log.Log()
config := GetConfig()
if !config.IndexFromPrehistory {
return nil, nil
return &Indexer{Config: config}, nil
}

if config.To > config.PrehistoryHeight {
Expand All @@ -67,9 +66,9 @@ func New(dbh db.Handler) (*Indexer, error) {
}

i := &Indexer{
Config: config,
dbh: dbh,
logger: logger,
config: config,
stopCh: make(chan struct{}),
reader: PreHistoryReader{},
}
Expand All @@ -78,6 +77,9 @@ func New(dbh db.Handler) (*Indexer, error) {

// Start starts the prehistory indexing as a goroutine based on the config file settings
func (i *Indexer) Start() {
if !i.Config.IndexFromPrehistory {
return
}
i.lock.Lock()
defer i.lock.Unlock()
if !i.started {
Expand All @@ -88,6 +90,9 @@ func (i *Indexer) Start() {

// Close gracefully stops the prehistory indexer
func (i *Indexer) Close() {
if !i.started {
return
}
i.lock.Lock()
defer i.lock.Unlock()
i.logger.Info().Msgf("Prehistory indexer reveived close signal")
Expand All @@ -97,9 +102,9 @@ func (i *Indexer) Close() {
// Start starts the prehistory indexing as a goroutine based on the config file settings
func (i *Indexer) index() {
var err error
i.reader.dbPool, err = pgxpool.New(context.Background(), i.config.ArchiveURL)
i.reader.dbPool, err = pgxpool.New(context.Background(), i.Config.ArchiveURL)
if err != nil {
i.logger.Error().Msgf("Unable to connect to prehistory database %s: %v\n", i.config.ArchiveURL, err)
i.logger.Error().Msgf("Unable to connect to prehistory database %s: %v\n", i.Config.ArchiveURL, err)
return
}
defer i.reader.dbPool.Close()
Expand All @@ -114,10 +119,10 @@ func (i *Indexer) index() {
quantity := primitives.QuantityFromBytes(emptyBytes)
parentHash := primitives.Data32FromBytes(emptyBytes)
blockHash := primitives.Data32FromBytes(emptyBytes)
chainId := i.config.PrehistoryChainId
from := i.config.From
to := i.config.To
step := i.config.BatchSize
chainId := i.Config.PrehistoryChainId
from := i.Config.From
to := i.Config.To
step := i.Config.BatchSize

i.reader.startTime = time.Now()
i.logger.Info().Msgf("prehistory indexing started fromBlock: [%d] toBlock: [%d]", from, to)
Expand Down
28 changes: 14 additions & 14 deletions indexer/tar/indexer.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,47 +3,48 @@ package tar
import (
"bytes"
"fmt"

"github.com/aurora-is-near/stream-backup/chunks"
"github.com/aurora-is-near/stream-backup/messagebackup"
"github.com/fxamacker/cbor/v2"

"github.com/aurora-is-near/relayer2-base/db"
"github.com/aurora-is-near/relayer2-base/db/codec"
"github.com/aurora-is-near/relayer2-base/log"
"github.com/aurora-is-near/relayer2-base/types/indexer"
"github.com/aurora-is-near/stream-backup/messagebackup"
"github.com/fxamacker/cbor/v2"
)
import "github.com/aurora-is-near/relayer2-base/log"
import "github.com/aurora-is-near/stream-backup/chunks"

type Indexer struct {
Config *Config
dbh db.Handler
config *Config
reader chunks.Chunks
logger *log.Logger
mode cbor.DecMode
}

func New(dbh db.Handler) (*Indexer, error) {

logger := log.Log()
config := GetConfig()

if !config.IndexFromBackup {
return nil, nil
return &Indexer{Config: config}, nil
}

i := &Indexer{
Config: config,
dbh: dbh,
logger: logger,
config: config,
mode: codec.CborDecoder(),
reader: chunks.Chunks{
Dir: config.Dir,
ChunkNamePrefix: config.NamePrefix,
}}
},
}
return i, nil
}

func (i *Indexer) Start() {

if !i.config.IndexFromBackup {
if !i.Config.IndexFromBackup {
return
}

Expand All @@ -52,7 +53,7 @@ func (i *Indexer) Start() {
}
defer i.reader.CloseReader()

if err := i.reader.SeekReader(i.config.From); err != nil {
if err := i.reader.SeekReader(i.Config.From); err != nil {
i.logger.Fatal().Err(err).Msg("failed to position file reader")
}

Expand Down Expand Up @@ -83,15 +84,14 @@ func (i *Indexer) Start() {
if err != nil {
i.logger.Fatal().Err(err).Msgf("failed to insert block [%d]", block.Height)
}
if i.config.To != 0 && i.config.To == seq {
if i.Config.To != 0 && i.Config.To == seq {
break
}
}
i.logger.Info().Msgf("backup indexer finished")
}

func (i *Indexer) Close() {

}

func DecodeAugmentedCBOR[T any](input []byte, mode cbor.DecMode) (*T, error) {
Expand Down
9 changes: 7 additions & 2 deletions log/log.go
Original file line number Diff line number Diff line change
@@ -1,17 +1,19 @@
package log

import (
"github.com/aurora-is-near/relayer2-base/syncutils"
"io"
"os"

"github.com/rs/zerolog"

"github.com/aurora-is-near/relayer2-base/syncutils"
)

var globalPtr syncutils.LockablePtr[Logger]

type Logger struct {
zerolog.Logger
Config *Config
}

func (l *Logger) HandleConfigChange() {
Expand Down Expand Up @@ -50,5 +52,8 @@ func log() *Logger {
if config.LogToFile {
writers = append(writers, NewFileWriter(config.FilePath))
}
return &Logger{zerolog.New(io.MultiWriter(writers...)).With().Timestamp().Logger()}
return &Logger{
Logger: zerolog.New(io.MultiWriter(writers...)).With().Timestamp().Logger(),
Config: config,
}
}
12 changes: 7 additions & 5 deletions rpcnode/github-ethereum-go-ethereum/rpcnode.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,14 @@ import (
"os"
"time"

"github.com/aurora-is-near/relayer2-base/broker"
"github.com/aurora-is-near/relayer2-base/log"
eventbroker "github.com/aurora-is-near/relayer2-base/rpcnode/github-ethereum-go-ethereum/events"
"golang.org/x/net/context"

gel "github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/node"
"github.com/jinzhu/copier"
"golang.org/x/net/context"

"github.com/aurora-is-near/relayer2-base/broker"
"github.com/aurora-is-near/relayer2-base/log"
eventbroker "github.com/aurora-is-near/relayer2-base/rpcnode/github-ethereum-go-ethereum/events"
)

const (
Expand All @@ -25,6 +25,7 @@ const (
type GoEthereum struct {
node.Node
Broker broker.Broker
Config *Config
}

type connection struct {
Expand Down Expand Up @@ -70,6 +71,7 @@ func NewWithConf(conf *Config) (*GoEthereum, error) {
return &GoEthereum{
Node: *n,
Broker: eb,
Config: conf,
}, nil
}

Expand Down

0 comments on commit 04a7fec

Please sign in to comment.