diff --git a/cmd/sync.go b/cmd/sync.go index 7c4a7db34..cf8ae12b0 100644 --- a/cmd/sync.go +++ b/cmd/sync.go @@ -24,7 +24,7 @@ import ( "github.com/spf13/cobra" "github.com/spf13/viper" - w "github.com/vulcanize/ipfs-blockchain-watcher/pkg/watch" + w "github.com/vulcanize/ipfs-blockchain-watcher/pkg/sync" v "github.com/vulcanize/ipfs-blockchain-watcher/version" ) diff --git a/environments/superNodeSubscription.toml b/environments/superNodeSubscription.toml deleted file mode 100644 index 14a823790..000000000 --- a/environments/superNodeSubscription.toml +++ /dev/null @@ -1,30 +0,0 @@ -[watcher] - [watcher.ethSubscription] - historicalData = false - historicalDataOnly = false - startingBlock = 0 - endingBlock = 0 - wsPath = "ws://127.0.0.1:8080" - [watcher.ethSubscription.headerFilter] - off = false - uncles = false - [watcher.ethSubscription.txFilter] - off = false - src = [] - dst = [] - [watcher.ethSubscription.receiptFilter] - off = false - contracts = [] - topic0s = [] - topic1s = [] - topic2s = [] - topic3s = [] - [watcher.ethSubscription.stateFilter] - off = false - addresses = [] - intermediateNodes = false - [watcher.ethSubscription.storageFilter] - off = true - addresses = [] - storageKeys = [] - intermediateNodes = false \ No newline at end of file diff --git a/pkg/eth/cleaner.go b/pkg/eth/cleaner.go index f15c0da03..1cef25b1c 100644 --- a/pkg/eth/cleaner.go +++ b/pkg/eth/cleaner.go @@ -37,8 +37,8 @@ type DBCleaner struct { db *postgres.DB } -// NewCleaner returns a new Cleaner struct that satisfies the shared.Cleaner interface -func NewCleaner(db *postgres.DB) *DBCleaner { +// NewDBCleaner returns a new DBCleaner struct +func NewDBCleaner(db *postgres.DB) *DBCleaner { return &DBCleaner{ db: db, } diff --git a/pkg/eth/cleaner_test.go b/pkg/eth/cleaner_test.go index f742eddf2..44c4ab2ae 100644 --- a/pkg/eth/cleaner_test.go +++ b/pkg/eth/cleaner_test.go @@ -246,7 +246,7 @@ var _ = Describe("Cleaner", func() { db, err = shared.SetupDB() Expect(err).ToNot(HaveOccurred()) repo = eth.NewCIDIndexer(db) - cleaner = eth.NewCleaner(db) + cleaner = eth.NewDBCleaner(db) }) Describe("Clean", func() { BeforeEach(func() { diff --git a/pkg/eth/retriever.go b/pkg/eth/retriever.go index 76a6ae591..293fc1928 100644 --- a/pkg/eth/retriever.go +++ b/pkg/eth/retriever.go @@ -37,8 +37,8 @@ type GapRetriever struct { db *postgres.DB } -// NewRetriever returns a pointer to a new Retriever -func NewRetriever(db *postgres.DB) *GapRetriever { +// NewGapRetriever returns a pointer to a new GapRetriever +func NewGapRetriever(db *postgres.DB) *GapRetriever { return &GapRetriever{ db: db, } @@ -58,6 +58,7 @@ func (ecr *GapRetriever) RetrieveLastBlockNumber() (int64, error) { return blockNumber, err } +// DBGap type for querying for gaps in db type DBGap struct { Start uint64 `db:"start"` Stop uint64 `db:"stop"` diff --git a/pkg/eth/retriever_test.go b/pkg/eth/retriever_test.go index 04dcbf7b6..44e26dd89 100644 --- a/pkg/eth/retriever_test.go +++ b/pkg/eth/retriever_test.go @@ -40,7 +40,7 @@ var _ = Describe("Retriever", func() { db, err = shared.SetupDB() Expect(err).ToNot(HaveOccurred()) repo = eth2.NewIPLDPublisher(db) - retriever = eth2.NewRetriever(db) + retriever = eth2.NewGapRetriever(db) }) AfterEach(func() { eth.TearDownDB(db) @@ -308,7 +308,7 @@ var _ = Describe("Retriever", func() { err = repo.Publish(payload14) Expect(err).ToNot(HaveOccurred()) - cleaner := eth.NewCleaner(db) + cleaner := eth.NewDBCleaner(db) err = cleaner.ResetValidation([][2]uint64{{101, 102}, {104, 104}, {106, 108}}) Expect(err).ToNot(HaveOccurred()) diff --git a/pkg/historical/config.go b/pkg/historical/config.go index 0279a2cf1..bf55a17b2 100644 --- a/pkg/historical/config.go +++ b/pkg/historical/config.go @@ -59,15 +59,6 @@ type Config struct { // NewConfig is used to initialize a historical config from a .toml file func NewConfig() (*Config, error) { c := new(Config) - c.DBConfig.Init() - if err := c.init(); err != nil { - return nil, err - } - - return c, nil -} - -func (c *Config) init() error { var err error viper.BindEnv("ethereum.httpPath", shared.ETH_HTTP_PATH) @@ -83,12 +74,6 @@ func (c *Config) init() error { } c.Timeout = time.Second * time.Duration(timeout) - ethHTTP := viper.GetString("ethereum.httpPath") - c.NodeInfo, c.HTTPClient, err = shared.GetEthNodeAndClient(fmt.Sprintf("http://%s", ethHTTP)) - if err != nil { - return err - } - freq := viper.GetInt("backfill.frequency") var frequency time.Duration if freq <= 0 { @@ -101,18 +86,24 @@ func (c *Config) init() error { c.Workers = uint64(viper.GetInt64("backfill.workers")) c.ValidationLevel = viper.GetInt("backfill.validationLevel") - dbConn := overrideDBConnConfig(c.DBConfig) - db := utils.LoadPostgres(dbConn, c.NodeInfo) + ethHTTP := viper.GetString("ethereum.httpPath") + c.NodeInfo, c.HTTPClient, err = shared.GetEthNodeAndClient(fmt.Sprintf("http://%s", ethHTTP)) + if err != nil { + return nil, err + } + + c.DBConfig.Init() + overrideDBConnConfig(&c.DBConfig) + db := utils.LoadPostgres(c.DBConfig, c.NodeInfo) c.DB = &db - return nil + return c, nil } -func overrideDBConnConfig(con postgres.Config) postgres.Config { +func overrideDBConnConfig(con *postgres.Config) { viper.BindEnv("database.backfill.maxIdle", BACKFILL_MAX_IDLE_CONNECTIONS) viper.BindEnv("database.backfill.maxOpen", BACKFILL_MAX_OPEN_CONNECTIONS) viper.BindEnv("database.backfill.maxLifetime", BACKFILL_MAX_CONN_LIFETIME) con.MaxIdle = viper.GetInt("database.backfill.maxIdle") con.MaxOpen = viper.GetInt("database.backfill.maxOpen") con.MaxLifetime = viper.GetInt("database.backfill.maxLifetime") - return con } diff --git a/pkg/historical/service.go b/pkg/historical/service.go index 13550277c..ba10e8292 100644 --- a/pkg/historical/service.go +++ b/pkg/historical/service.go @@ -70,7 +70,7 @@ func NewBackfillService(settings *Config) (Backfill, error) { } bs.Converter = eth.NewPayloadConverter(bs.ChainConfig) bs.Publisher = eth.NewIPLDPublisher(settings.DB) - bs.Retriever = eth.NewRetriever(settings.DB) + bs.Retriever = eth.NewGapRetriever(settings.DB) bs.BatchSize = settings.BatchSize if bs.BatchSize == 0 { bs.BatchSize = shared.DefaultMaxBatchSize @@ -79,6 +79,9 @@ func NewBackfillService(settings *Config) (Backfill, error) { if bs.Workers == 0 { bs.Workers = shared.DefaultMaxBatchNumber } + bs.QuitChan = make(chan bool) + bs.validationLevel = settings.ValidationLevel + bs.GapCheckFrequency = settings.Frequency return bs, nil } diff --git a/pkg/postgres/errors.go b/pkg/postgres/errors.go index eac04b8c1..f368f900b 100644 --- a/pkg/postgres/errors.go +++ b/pkg/postgres/errors.go @@ -1,3 +1,19 @@ +// VulcanizeDB +// Copyright © 2019 Vulcanize + +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. + +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + package postgres import ( diff --git a/pkg/resync/config.go b/pkg/resync/config.go index 67428d6f6..389d2355e 100644 --- a/pkg/resync/config.go +++ b/pkg/resync/config.go @@ -39,6 +39,10 @@ const ( RESYNC_CLEAR_OLD_CACHE = "RESYNC_CLEAR_OLD_CACHE" RESYNC_TYPE = "RESYNC_TYPE" RESYNC_RESET_VALIDATION = "RESYNC_RESET_VALIDATION" + + RESYNC_MAX_IDLE_CONNECTIONS = "RESYNC_MAX_IDLE_CONNECTIONS" + RESYNC_MAX_OPEN_CONNECTIONS = "RESYNC_MAX_OPEN_CONNECTIONS" + RESYNC_MAX_CONN_LIFETIME = "RESYNC_MAX_CONN_LIFETIME" ) // Config holds the parameters needed to perform a resync @@ -85,6 +89,8 @@ func NewConfig() (*Config, error) { c.Ranges = [][2]uint64{{start, stop}} c.ClearOldCache = viper.GetBool("resync.clearOldCache") c.ResetValidation = viper.GetBool("resync.resetValidation") + c.BatchSize = uint64(viper.GetInt64("resync.batchSize")) + c.Workers = uint64(viper.GetInt64("resync.workers")) resyncType := viper.GetString("resync.type") c.ResyncType, err = shared.GenerateDataTypeFromString(resyncType) @@ -105,10 +111,17 @@ func NewConfig() (*Config, error) { } c.DBConfig.Init() + overrideDBConnConfig(&c.DBConfig) db := utils.LoadPostgres(c.DBConfig, c.NodeInfo) c.DB = &db - - c.BatchSize = uint64(viper.GetInt64("resync.batchSize")) - c.Workers = uint64(viper.GetInt64("resync.workers")) return c, nil } + +func overrideDBConnConfig(con *postgres.Config) { + viper.BindEnv("database.resync.maxIdle", RESYNC_MAX_IDLE_CONNECTIONS) + viper.BindEnv("database.resync.maxOpen", RESYNC_MAX_OPEN_CONNECTIONS) + viper.BindEnv("database.resync.maxLifetime", RESYNC_MAX_CONN_LIFETIME) + con.MaxIdle = viper.GetInt("database.resync.maxIdle") + con.MaxOpen = viper.GetInt("database.resync.maxOpen") + con.MaxLifetime = viper.GetInt("database.resync.maxLifetime") +} diff --git a/pkg/resync/service.go b/pkg/resync/service.go index 8db557392..0d753e271 100644 --- a/pkg/resync/service.go +++ b/pkg/resync/service.go @@ -69,7 +69,7 @@ func NewResyncService(settings *Config) (Resync, error) { } rs.Converter = eth.NewPayloadConverter(rs.ChainConfig) rs.Publisher = eth.NewIPLDPublisher(settings.DB) - rs.Cleaner = eth.NewCleaner(settings.DB) + rs.Cleaner = eth.NewDBCleaner(settings.DB) rs.BatchSize = settings.BatchSize if rs.BatchSize == 0 { rs.BatchSize = shared.DefaultMaxBatchSize @@ -78,6 +78,11 @@ func NewResyncService(settings *Config) (Resync, error) { if rs.Workers == 0 { rs.Workers = shared.DefaultMaxBatchNumber } + rs.resetValidation = settings.ResetValidation + rs.clearOldCache = settings.ClearOldCache + rs.quitChan = make(chan bool) + rs.ranges = settings.Ranges + rs.data = settings.ResyncType return rs, nil } diff --git a/pkg/watch/config.go b/pkg/sync/config.go similarity index 80% rename from pkg/watch/config.go rename to pkg/sync/config.go index 4bee5a7ea..68ec1d1d3 100644 --- a/pkg/watch/config.go +++ b/pkg/sync/config.go @@ -14,7 +14,7 @@ // You should have received a copy of the GNU Affero General Public License // along with this program. If not, see . -package watch +package sync import ( "fmt" @@ -30,7 +30,7 @@ import ( // Env variables const ( - WORKERS = "SYNC_WORKERS" + SYNC_WORKERS = "SYNC_WORKERS" SYNC_MAX_IDLE_CONNECTIONS = "SYNC_MAX_IDLE_CONNECTIONS" SYNC_MAX_OPEN_CONNECTIONS = "SYNC_MAX_OPEN_CONNECTIONS" @@ -41,43 +41,42 @@ const ( type Config struct { DB *postgres.DB DBConfig postgres.Config - Workers int + Workers int64 WSClient *rpc.Client NodeInfo node.Info } -// NewConfig is used to initialize a watcher config from a .toml file -// Separate chain watcher instances need to be ran with separate ipfs path in order to avoid lock contention on the ipfs repository lockfile +// NewConfig is used to initialize a sync config from a .toml file func NewConfig() (*Config, error) { c := new(Config) var err error - viper.BindEnv("sync.workers", WORKERS) + viper.BindEnv("sync.workers", SYNC_WORKERS) viper.BindEnv("ethereum.wsPath", shared.ETH_WS_PATH) - c.DBConfig.Init() - workers := viper.GetInt("sync.workers") + workers := viper.GetInt64("sync.workers") if workers < 1 { workers = 1 } c.Workers = workers + ethWS := viper.GetString("ethereum.wsPath") c.NodeInfo, c.WSClient, err = shared.GetEthNodeAndClient(fmt.Sprintf("ws://%s", ethWS)) if err != nil { return nil, err } - syncDBConn := overrideDBConnConfig(c.DBConfig) - syncDB := utils.LoadPostgres(syncDBConn, c.NodeInfo) - c.DB = &syncDB + c.DBConfig.Init() + overrideDBConnConfig(&c.DBConfig) + syncDB := utils.LoadPostgres(c.DBConfig, c.NodeInfo) + c.DB = &syncDB return c, nil } -func overrideDBConnConfig(con postgres.Config) postgres.Config { +func overrideDBConnConfig(con *postgres.Config) { viper.BindEnv("database.sync.maxIdle", SYNC_MAX_IDLE_CONNECTIONS) viper.BindEnv("database.sync.maxOpen", SYNC_MAX_OPEN_CONNECTIONS) viper.BindEnv("database.sync.maxLifetime", SYNC_MAX_CONN_LIFETIME) con.MaxIdle = viper.GetInt("database.sync.maxIdle") con.MaxOpen = viper.GetInt("database.sync.maxOpen") con.MaxLifetime = viper.GetInt("database.sync.maxLifetime") - return con } diff --git a/pkg/watch/service.go b/pkg/sync/service.go similarity index 91% rename from pkg/watch/service.go rename to pkg/sync/service.go index ae2d5c8a8..a03599112 100644 --- a/pkg/watch/service.go +++ b/pkg/sync/service.go @@ -14,7 +14,7 @@ // You should have received a copy of the GNU Affero General Public License // along with this program. If not, see . -package watch +package sync import ( "sync" @@ -35,7 +35,7 @@ const ( PayloadChanBufferSize = 2000 ) -// Indexer is the top level interface for streaming, converting to IPLDs, publishing, and indexing all chain data +// Indexer is the top level interface for streaming, converting to IPLDs, publishing, and indexing all chain data at head // This service is compatible with the Ethereum service interface (node.Service) type Indexer interface { // APIs(), Protocols(), Start() and Stop() @@ -63,7 +63,7 @@ type Service struct { // Info for the Geth node that this indexer is working with NodeInfo *node.Info // Number of sync workers - WorkerPoolSize int + Workers int64 // chain type for this service ChainConfig *params.ChainConfig } @@ -81,7 +81,7 @@ func NewIndexerService(settings *Config) (Indexer, error) { sn.Converter = eth.NewPayloadConverter(sn.ChainConfig) sn.Publisher = eth.NewIPLDPublisher(settings.DB) sn.QuitChan = make(chan bool) - sn.WorkerPoolSize = settings.Workers + sn.Workers = settings.Workers sn.NodeInfo = &settings.NodeInfo return sn, nil } @@ -98,7 +98,6 @@ func (sap *Service) APIs() []rpc.API { // Sync streams incoming raw chain data and converts it for further processing // It forwards the converted data to the publish process(es) it spins up -// If forwards the converted data to a ScreenAndServe process if it there is one listening on the passed screenAndServePayload channel // This continues on no matter if or how many subscribers there are func (sap *Service) Sync(wg *sync.WaitGroup) error { sub, err := sap.Streamer.Stream(sap.PayloadChan) @@ -106,8 +105,8 @@ func (sap *Service) Sync(wg *sync.WaitGroup) error { return err } // spin up publish worker goroutines - publishPayload := make(chan *eth.ConvertedPayload, PayloadChanBufferSize) - for i := 1; i <= sap.WorkerPoolSize; i++ { + publishPayload := make(chan eth.ConvertedPayload, PayloadChanBufferSize) + for i := 1; i <= int(sap.Workers); i++ { go sap.publish(wg, i, publishPayload) log.Debugf("ethereum publish worker %d successfully spun up", i) } @@ -126,10 +125,10 @@ func (sap *Service) Sync(wg *sync.WaitGroup) error { // Forward the payload to the publish workers // this channel acts as a ring buffer select { - case publishPayload <- ipldPayload: + case publishPayload <- *ipldPayload: default: <-publishPayload - publishPayload <- ipldPayload + publishPayload <- *ipldPayload } case err := <-sub.Err(): log.Errorf("ethereumm indexer subscription error: %v", err) @@ -145,14 +144,14 @@ func (sap *Service) Sync(wg *sync.WaitGroup) error { // publish is spun up by SyncAndConvert and receives converted chain data from that process // it publishes this data to IPFS and indexes their CIDs with useful metadata in Postgres -func (sap *Service) publish(wg *sync.WaitGroup, id int, publishPayload <-chan *eth.ConvertedPayload) { +func (sap *Service) publish(wg *sync.WaitGroup, id int, publishPayload <-chan eth.ConvertedPayload) { wg.Add(1) defer wg.Done() for { select { case payload := <-publishPayload: log.Debugf("ethereumindexer sync worker %d publishing and indexing data streamed at head height %d", id, payload.Block.Number().Uint64()) - if err := sap.Publisher.Publish(*payload); err != nil { + if err := sap.Publisher.Publish(payload); err != nil { log.Errorf("ethereum indexer publish worker %d publishing error: %v", id, err) continue } diff --git a/pkg/watch/service_test.go b/pkg/sync/service_test.go similarity index 87% rename from pkg/watch/service_test.go rename to pkg/sync/service_test.go index 800fc9845..9f6f4580e 100644 --- a/pkg/watch/service_test.go +++ b/pkg/sync/service_test.go @@ -14,7 +14,7 @@ // You should have received a copy of the GNU Affero General Public License // along with this program. If not, see . -package watch_test +package sync_test import ( "sync" @@ -26,7 +26,7 @@ import ( . "github.com/onsi/gomega" "github.com/vulcanize/ipfs-blockchain-watcher/pkg/eth/mocks" - "github.com/vulcanize/ipfs-blockchain-watcher/pkg/watch" + s "github.com/vulcanize/ipfs-blockchain-watcher/pkg/sync" ) var _ = Describe("Service", func() { @@ -49,13 +49,13 @@ var _ = Describe("Service", func() { ReturnIPLDPayload: &mocks.MockConvertedPayload, ReturnErr: nil, } - processor := &watch.Service{ - Publisher: mockPublisher, - Streamer: mockStreamer, - Converter: mockConverter, - PayloadChan: payloadChan, - QuitChan: quitChan, - WorkerPoolSize: 1, + processor := &s.Service{ + Publisher: mockPublisher, + Streamer: mockStreamer, + Converter: mockConverter, + PayloadChan: payloadChan, + QuitChan: quitChan, + Workers: 1, } err := processor.Sync(wg) Expect(err).ToNot(HaveOccurred()) diff --git a/pkg/watch/watch_suite_test.go b/pkg/sync/sync_suite_test.go similarity index 90% rename from pkg/watch/watch_suite_test.go rename to pkg/sync/sync_suite_test.go index 821ae69ba..19e886a7c 100644 --- a/pkg/watch/watch_suite_test.go +++ b/pkg/sync/sync_suite_test.go @@ -14,7 +14,7 @@ // You should have received a copy of the GNU Affero General Public License // along with this program. If not, see . -package watch_test +package sync_test import ( "io/ioutil" @@ -25,9 +25,9 @@ import ( "github.com/sirupsen/logrus" ) -func TestIPFSWatcher(t *testing.T) { +func TestIPLDSync(t *testing.T) { RegisterFailHandler(Fail) - RunSpecs(t, "IPFS Watcher Suite Test") + RunSpecs(t, "IPLD Sync Suite Test") } var _ = BeforeSuite(func() { diff --git a/pkg/watch/subscription.go b/pkg/watch/subscription.go deleted file mode 100644 index 1b3474c78..000000000 --- a/pkg/watch/subscription.go +++ /dev/null @@ -1,60 +0,0 @@ -// VulcanizeDB -// Copyright © 2019 Vulcanize - -// This program is free software: you can redistribute it and/or modify -// it under the terms of the GNU Affero General Public License as published by -// the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. - -// This program is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU Affero General Public License for more details. - -// You should have received a copy of the GNU Affero General Public License -// along with this program. If not, see . - -package watch - -import ( - "errors" - - "github.com/ethereum/go-ethereum/rpc" -) - -type Flag int32 - -const ( - EmptyFlag Flag = iota - BackFillCompleteFlag -) - -// Subscription holds the information for an individual client subscription to the watcher -type Subscription struct { - ID rpc.ID - PayloadChan chan<- SubscriptionPayload - QuitChan chan<- bool -} - -// SubscriptionPayload is the struct for a watcher data subscription payload -// It carries data of a type specific to the chain being supported/queried and an error message -type SubscriptionPayload struct { - Data []byte `json:"data"` // e.g. for Ethereum rlp serialized eth.StreamPayload - Height int64 `json:"height"` - Err string `json:"err"` // field for error - Flag Flag `json:"flag"` // field for message -} - -func (sp SubscriptionPayload) Error() error { - if sp.Err == "" { - return nil - } - return errors.New(sp.Err) -} - -func (sp SubscriptionPayload) BackFillComplete() bool { - if sp.Flag == BackFillCompleteFlag { - return true - } - return false -}