Skip to content
This repository has been archived by the owner on Sep 28, 2021. It is now read-only.

Commit

Permalink
fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
i-norden committed Aug 26, 2020
1 parent 7986e71 commit e6b9efa
Show file tree
Hide file tree
Showing 16 changed files with 96 additions and 159 deletions.
2 changes: 1 addition & 1 deletion cmd/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import (
"github.com/spf13/cobra"
"github.com/spf13/viper"

w "github.com/vulcanize/ipfs-blockchain-watcher/pkg/watch"
w "github.com/vulcanize/ipfs-blockchain-watcher/pkg/sync"
v "github.com/vulcanize/ipfs-blockchain-watcher/version"
)

Expand Down
30 changes: 0 additions & 30 deletions environments/superNodeSubscription.toml

This file was deleted.

4 changes: 2 additions & 2 deletions pkg/eth/cleaner.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,8 @@ type DBCleaner struct {
db *postgres.DB
}

// NewCleaner returns a new Cleaner struct that satisfies the shared.Cleaner interface
func NewCleaner(db *postgres.DB) *DBCleaner {
// NewDBCleaner returns a new DBCleaner struct
func NewDBCleaner(db *postgres.DB) *DBCleaner {
return &DBCleaner{
db: db,
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/eth/cleaner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -246,7 +246,7 @@ var _ = Describe("Cleaner", func() {
db, err = shared.SetupDB()
Expect(err).ToNot(HaveOccurred())
repo = eth.NewCIDIndexer(db)
cleaner = eth.NewCleaner(db)
cleaner = eth.NewDBCleaner(db)
})
Describe("Clean", func() {
BeforeEach(func() {
Expand Down
5 changes: 3 additions & 2 deletions pkg/eth/retriever.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,8 @@ type GapRetriever struct {
db *postgres.DB
}

// NewRetriever returns a pointer to a new Retriever
func NewRetriever(db *postgres.DB) *GapRetriever {
// NewGapRetriever returns a pointer to a new GapRetriever
func NewGapRetriever(db *postgres.DB) *GapRetriever {
return &GapRetriever{
db: db,
}
Expand All @@ -58,6 +58,7 @@ func (ecr *GapRetriever) RetrieveLastBlockNumber() (int64, error) {
return blockNumber, err
}

// DBGap type for querying for gaps in db
type DBGap struct {
Start uint64 `db:"start"`
Stop uint64 `db:"stop"`
Expand Down
4 changes: 2 additions & 2 deletions pkg/eth/retriever_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ var _ = Describe("Retriever", func() {
db, err = shared.SetupDB()
Expect(err).ToNot(HaveOccurred())
repo = eth2.NewIPLDPublisher(db)
retriever = eth2.NewRetriever(db)
retriever = eth2.NewGapRetriever(db)
})
AfterEach(func() {
eth.TearDownDB(db)
Expand Down Expand Up @@ -308,7 +308,7 @@ var _ = Describe("Retriever", func() {
err = repo.Publish(payload14)
Expect(err).ToNot(HaveOccurred())

cleaner := eth.NewCleaner(db)
cleaner := eth.NewDBCleaner(db)
err = cleaner.ResetValidation([][2]uint64{{101, 102}, {104, 104}, {106, 108}})
Expect(err).ToNot(HaveOccurred())

Expand Down
31 changes: 11 additions & 20 deletions pkg/historical/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,15 +59,6 @@ type Config struct {
// NewConfig is used to initialize a historical config from a .toml file
func NewConfig() (*Config, error) {
c := new(Config)
c.DBConfig.Init()
if err := c.init(); err != nil {
return nil, err
}

return c, nil
}

func (c *Config) init() error {
var err error

viper.BindEnv("ethereum.httpPath", shared.ETH_HTTP_PATH)
Expand All @@ -83,12 +74,6 @@ func (c *Config) init() error {
}
c.Timeout = time.Second * time.Duration(timeout)

ethHTTP := viper.GetString("ethereum.httpPath")
c.NodeInfo, c.HTTPClient, err = shared.GetEthNodeAndClient(fmt.Sprintf("http://%s", ethHTTP))
if err != nil {
return err
}

freq := viper.GetInt("backfill.frequency")
var frequency time.Duration
if freq <= 0 {
Expand All @@ -101,18 +86,24 @@ func (c *Config) init() error {
c.Workers = uint64(viper.GetInt64("backfill.workers"))
c.ValidationLevel = viper.GetInt("backfill.validationLevel")

dbConn := overrideDBConnConfig(c.DBConfig)
db := utils.LoadPostgres(dbConn, c.NodeInfo)
ethHTTP := viper.GetString("ethereum.httpPath")
c.NodeInfo, c.HTTPClient, err = shared.GetEthNodeAndClient(fmt.Sprintf("http://%s", ethHTTP))
if err != nil {
return nil, err
}

c.DBConfig.Init()
overrideDBConnConfig(&c.DBConfig)
db := utils.LoadPostgres(c.DBConfig, c.NodeInfo)
c.DB = &db
return nil
return c, nil
}

func overrideDBConnConfig(con postgres.Config) postgres.Config {
func overrideDBConnConfig(con *postgres.Config) {
viper.BindEnv("database.backfill.maxIdle", BACKFILL_MAX_IDLE_CONNECTIONS)
viper.BindEnv("database.backfill.maxOpen", BACKFILL_MAX_OPEN_CONNECTIONS)
viper.BindEnv("database.backfill.maxLifetime", BACKFILL_MAX_CONN_LIFETIME)
con.MaxIdle = viper.GetInt("database.backfill.maxIdle")
con.MaxOpen = viper.GetInt("database.backfill.maxOpen")
con.MaxLifetime = viper.GetInt("database.backfill.maxLifetime")
return con
}
5 changes: 4 additions & 1 deletion pkg/historical/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ func NewBackfillService(settings *Config) (Backfill, error) {
}
bs.Converter = eth.NewPayloadConverter(bs.ChainConfig)
bs.Publisher = eth.NewIPLDPublisher(settings.DB)
bs.Retriever = eth.NewRetriever(settings.DB)
bs.Retriever = eth.NewGapRetriever(settings.DB)
bs.BatchSize = settings.BatchSize
if bs.BatchSize == 0 {
bs.BatchSize = shared.DefaultMaxBatchSize
Expand All @@ -79,6 +79,9 @@ func NewBackfillService(settings *Config) (Backfill, error) {
if bs.Workers == 0 {
bs.Workers = shared.DefaultMaxBatchNumber
}
bs.QuitChan = make(chan bool)
bs.validationLevel = settings.ValidationLevel
bs.GapCheckFrequency = settings.Frequency
return bs, nil
}

Expand Down
16 changes: 16 additions & 0 deletions pkg/postgres/errors.go
Original file line number Diff line number Diff line change
@@ -1,3 +1,19 @@
// VulcanizeDB
// Copyright © 2019 Vulcanize

// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.

// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.

// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.

package postgres

import (
Expand Down
19 changes: 16 additions & 3 deletions pkg/resync/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,10 @@ const (
RESYNC_CLEAR_OLD_CACHE = "RESYNC_CLEAR_OLD_CACHE"
RESYNC_TYPE = "RESYNC_TYPE"
RESYNC_RESET_VALIDATION = "RESYNC_RESET_VALIDATION"

RESYNC_MAX_IDLE_CONNECTIONS = "RESYNC_MAX_IDLE_CONNECTIONS"
RESYNC_MAX_OPEN_CONNECTIONS = "RESYNC_MAX_OPEN_CONNECTIONS"
RESYNC_MAX_CONN_LIFETIME = "RESYNC_MAX_CONN_LIFETIME"
)

// Config holds the parameters needed to perform a resync
Expand Down Expand Up @@ -85,6 +89,8 @@ func NewConfig() (*Config, error) {
c.Ranges = [][2]uint64{{start, stop}}
c.ClearOldCache = viper.GetBool("resync.clearOldCache")
c.ResetValidation = viper.GetBool("resync.resetValidation")
c.BatchSize = uint64(viper.GetInt64("resync.batchSize"))
c.Workers = uint64(viper.GetInt64("resync.workers"))

resyncType := viper.GetString("resync.type")
c.ResyncType, err = shared.GenerateDataTypeFromString(resyncType)
Expand All @@ -105,10 +111,17 @@ func NewConfig() (*Config, error) {
}

c.DBConfig.Init()
overrideDBConnConfig(&c.DBConfig)
db := utils.LoadPostgres(c.DBConfig, c.NodeInfo)
c.DB = &db

c.BatchSize = uint64(viper.GetInt64("resync.batchSize"))
c.Workers = uint64(viper.GetInt64("resync.workers"))
return c, nil
}

func overrideDBConnConfig(con *postgres.Config) {
viper.BindEnv("database.resync.maxIdle", RESYNC_MAX_IDLE_CONNECTIONS)
viper.BindEnv("database.resync.maxOpen", RESYNC_MAX_OPEN_CONNECTIONS)
viper.BindEnv("database.resync.maxLifetime", RESYNC_MAX_CONN_LIFETIME)
con.MaxIdle = viper.GetInt("database.resync.maxIdle")
con.MaxOpen = viper.GetInt("database.resync.maxOpen")
con.MaxLifetime = viper.GetInt("database.resync.maxLifetime")
}
7 changes: 6 additions & 1 deletion pkg/resync/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ func NewResyncService(settings *Config) (Resync, error) {
}
rs.Converter = eth.NewPayloadConverter(rs.ChainConfig)
rs.Publisher = eth.NewIPLDPublisher(settings.DB)
rs.Cleaner = eth.NewCleaner(settings.DB)
rs.Cleaner = eth.NewDBCleaner(settings.DB)
rs.BatchSize = settings.BatchSize
if rs.BatchSize == 0 {
rs.BatchSize = shared.DefaultMaxBatchSize
Expand All @@ -78,6 +78,11 @@ func NewResyncService(settings *Config) (Resync, error) {
if rs.Workers == 0 {
rs.Workers = shared.DefaultMaxBatchNumber
}
rs.resetValidation = settings.ResetValidation
rs.clearOldCache = settings.ClearOldCache
rs.quitChan = make(chan bool)
rs.ranges = settings.Ranges
rs.data = settings.ResyncType
return rs, nil
}

Expand Down
25 changes: 12 additions & 13 deletions pkg/watch/config.go → pkg/sync/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.

package watch
package sync

import (
"fmt"
Expand All @@ -30,7 +30,7 @@ import (

// Env variables
const (
WORKERS = "SYNC_WORKERS"
SYNC_WORKERS = "SYNC_WORKERS"

SYNC_MAX_IDLE_CONNECTIONS = "SYNC_MAX_IDLE_CONNECTIONS"
SYNC_MAX_OPEN_CONNECTIONS = "SYNC_MAX_OPEN_CONNECTIONS"
Expand All @@ -41,43 +41,42 @@ const (
type Config struct {
DB *postgres.DB
DBConfig postgres.Config
Workers int
Workers int64
WSClient *rpc.Client
NodeInfo node.Info
}

// NewConfig is used to initialize a watcher config from a .toml file
// Separate chain watcher instances need to be ran with separate ipfs path in order to avoid lock contention on the ipfs repository lockfile
// NewConfig is used to initialize a sync config from a .toml file
func NewConfig() (*Config, error) {
c := new(Config)
var err error
viper.BindEnv("sync.workers", WORKERS)
viper.BindEnv("sync.workers", SYNC_WORKERS)
viper.BindEnv("ethereum.wsPath", shared.ETH_WS_PATH)
c.DBConfig.Init()

workers := viper.GetInt("sync.workers")
workers := viper.GetInt64("sync.workers")
if workers < 1 {
workers = 1
}
c.Workers = workers

ethWS := viper.GetString("ethereum.wsPath")
c.NodeInfo, c.WSClient, err = shared.GetEthNodeAndClient(fmt.Sprintf("ws://%s", ethWS))
if err != nil {
return nil, err
}
syncDBConn := overrideDBConnConfig(c.DBConfig)
syncDB := utils.LoadPostgres(syncDBConn, c.NodeInfo)
c.DB = &syncDB

c.DBConfig.Init()
overrideDBConnConfig(&c.DBConfig)
syncDB := utils.LoadPostgres(c.DBConfig, c.NodeInfo)
c.DB = &syncDB
return c, nil
}

func overrideDBConnConfig(con postgres.Config) postgres.Config {
func overrideDBConnConfig(con *postgres.Config) {
viper.BindEnv("database.sync.maxIdle", SYNC_MAX_IDLE_CONNECTIONS)
viper.BindEnv("database.sync.maxOpen", SYNC_MAX_OPEN_CONNECTIONS)
viper.BindEnv("database.sync.maxLifetime", SYNC_MAX_CONN_LIFETIME)
con.MaxIdle = viper.GetInt("database.sync.maxIdle")
con.MaxOpen = viper.GetInt("database.sync.maxOpen")
con.MaxLifetime = viper.GetInt("database.sync.maxLifetime")
return con
}
Loading

0 comments on commit e6b9efa

Please sign in to comment.