From d8a5358a7013455a23627aa530acd1ef66cc3e97 Mon Sep 17 00:00:00 2001 From: Arijit Das Date: Thu, 12 Aug 2021 11:53:41 +0530 Subject: [PATCH] Remove ipld-eth-indexer dependency. --- cmd/serve.go | 2 +- .../00004_create_eth_header_cids_table.sql | 1 + ...0006_create_eth_transaction_cids_table.sql | 1 + .../00016_create_eth_log_cids_table.sql | 64 +++++++++++++ go.mod | 3 +- go.sum | 2 + pkg/eth/api_test.go | 46 +++++++--- pkg/eth/backend.go | 10 +-- pkg/eth/backend_utils.go | 3 +- pkg/eth/cid_retriever.go | 89 +++++++++---------- pkg/eth/cid_retriever_test.go | 86 ++++++++++++------ pkg/eth/eth_state_test.go | 51 ++++++----- pkg/eth/filterer.go | 41 +++++---- pkg/eth/filterer_test.go | 3 +- pkg/eth/ipld_fetcher.go | 19 ++-- pkg/eth/ipld_fetcher_test.go | 27 ++++-- pkg/eth/ipld_retriever.go | 3 +- pkg/eth/test_helpers.go | 9 +- pkg/eth/test_helpers/test_data.go | 82 +++++++++-------- pkg/eth/types.go | 30 +++++-- pkg/graphql/graphql_test.go | 82 ++++++++--------- pkg/serve/api.go | 3 +- pkg/serve/config.go | 25 +++--- pkg/serve/env.go | 41 +++++++++ pkg/serve/service.go | 13 ++- pkg/shared/functions.go | 2 +- pkg/shared/test_helpers.go | 2 +- test_config/test_config.go | 7 +- 28 files changed, 468 insertions(+), 279 deletions(-) create mode 100644 db/migrations/00016_create_eth_log_cids_table.sql create mode 100644 pkg/serve/env.go diff --git a/cmd/serve.go b/cmd/serve.go index aa21f55fa..e45e5ddf4 100644 --- a/cmd/serve.go +++ b/cmd/serve.go @@ -28,9 +28,9 @@ import ( log "github.com/sirupsen/logrus" "github.com/spf13/cobra" "github.com/spf13/viper" + "github.com/vulcanize/ipld-eth-server/pkg/eth" "github.com/vulcanize/gap-filler/pkg/mux" - "github.com/vulcanize/ipld-eth-indexer/pkg/eth" "github.com/vulcanize/ipld-eth-server/pkg/graphql" srpc "github.com/vulcanize/ipld-eth-server/pkg/rpc" s "github.com/vulcanize/ipld-eth-server/pkg/serve" diff --git a/db/migrations/00004_create_eth_header_cids_table.sql b/db/migrations/00004_create_eth_header_cids_table.sql index 339eb427b..f01b116a0 100644 --- a/db/migrations/00004_create_eth_header_cids_table.sql +++ b/db/migrations/00004_create_eth_header_cids_table.sql @@ -16,6 +16,7 @@ CREATE TABLE eth.header_cids ( bloom BYTEA NOT NULL, timestamp NUMERIC NOT NULL, times_validated INTEGER NOT NULL DEFAULT 1, + base_fee BIGINT, UNIQUE (block_number, block_hash) ); diff --git a/db/migrations/00006_create_eth_transaction_cids_table.sql b/db/migrations/00006_create_eth_transaction_cids_table.sql index 8be504f37..fc65932d5 100644 --- a/db/migrations/00006_create_eth_transaction_cids_table.sql +++ b/db/migrations/00006_create_eth_transaction_cids_table.sql @@ -9,6 +9,7 @@ CREATE TABLE eth.transaction_cids ( dst VARCHAR(66) NOT NULL, src VARCHAR(66) NOT NULL, tx_data BYTEA, + tx_type BYTEA, UNIQUE (header_id, tx_hash) ); diff --git a/db/migrations/00016_create_eth_log_cids_table.sql b/db/migrations/00016_create_eth_log_cids_table.sql new file mode 100644 index 000000000..fc25a836b --- /dev/null +++ b/db/migrations/00016_create_eth_log_cids_table.sql @@ -0,0 +1,64 @@ +-- +goose Up +CREATE TABLE eth.log_cids ( + id SERIAL PRIMARY KEY, + receipt_id INTEGER NOT NULL REFERENCES eth.receipt_cids (id) ON DELETE CASCADE DEFERRABLE INITIALLY DEFERRED, + address TEXT NOT NULL, + cid TEXT NOT NULL, + mh_key TEXT NOT NULL REFERENCES public.blocks (key) ON DELETE CASCADE DEFERRABLE INITIALLY DEFERRED, + block_number BIGINT NOT NULL, + block_hash VARCHAR(66) NOT NULL, + tx_hash VARCHAR(66) NOT NULL, + tx_index INTEGER NOT NULL, + index INTEGER NOT NULL, + topic0s VARCHAR(66)[], + topic1s VARCHAR(66)[], + topic2s VARCHAR(66)[], + topic3s VARCHAR(66)[], + UNIQUE (block_hash, tx_hash, index) +); + +-- TODO: Remove topics from receipts to avoid redundancy. +-- ALTER TABLE eth.receipt_cids +-- DROP COLUMN topic0s, +-- DROP COLUMN topic1s, +-- DROP COLUMN topic2s, +-- DROP COLUMN topic3s, +ALTER TABLE eth.receipt_cids +ADD COLUMN log_root VARCHAR(66); + +CREATE INDEX log_rct_id_index ON eth.log_cids USING btree (receipt_id); + +CREATE INDEX log_mh_index ON eth.log_cids USING btree (mh_key); + +CREATE INDEX log_cid_index ON eth.log_cids USING btree (cid); + +-- +-- Name: log_topic0_index; Type: INDEX; Schema: eth; Owner: - +-- + +CREATE INDEX log_topic0_index ON eth.log_cids USING gin (topic0s); + + +-- +-- Name: log_topic1_index; Type: INDEX; Schema: eth; Owner: - +-- + +CREATE INDEX log_topic1_index ON eth.log_cids USING gin (topic1s); + + +-- +-- Name: log_topic2_index; Type: INDEX; Schema: eth; Owner: - +-- + +CREATE INDEX log_topic2_index ON eth.log_cids USING gin (topic2s); + + +-- +-- Name: log_topic3_index; Type: INDEX; Schema: eth; Owner: - +-- + +CREATE INDEX log_topic3_index ON eth.log_cids USING gin (topic3s); + + +-- +goose Down +DROP TABLE eth.logs; \ No newline at end of file diff --git a/go.mod b/go.mod index ca9d8eec1..384c2bb9b 100644 --- a/go.mod +++ b/go.mod @@ -24,9 +24,8 @@ require ( github.com/tklauser/go-sysconf v0.3.6 // indirect github.com/vulcanize/gap-filler v0.3.1 github.com/vulcanize/ipfs-ethdb v0.0.2-alpha - github.com/vulcanize/ipld-eth-indexer v0.7.1-alpha.0.20210805022537-b4692fa49849 ) -replace github.com/ethereum/go-ethereum v1.9.25 => github.com/vulcanize/go-ethereum v1.10.4-statediff-0.0.25 +replace github.com/ethereum/go-ethereum v1.9.25 => /Users/arijitdas/go/src/github.com/ethereum/go-ethereum replace github.com/vulcanize/ipfs-ethdb v0.0.2-alpha => github.com/vulcanize/pg-ipfs-ethdb v0.0.2-alpha \ No newline at end of file diff --git a/go.sum b/go.sum index 0c6a1826a..0d6347205 100644 --- a/go.sum +++ b/go.sum @@ -497,6 +497,8 @@ github.com/huin/goupnp v1.0.0 h1:wg75sLpL6DZqwHQN6E1Cfk6mtfzS45z8OV+ic+DtHRo= github.com/huin/goupnp v1.0.0/go.mod h1:n9v9KO1tAxYH82qOn+UTIFQDmx5n1Zxd/ClZDMX7Bnc= github.com/huin/goupnp v1.0.1-0.20210310174557-0ca763054c88 h1:bcAj8KroPf552TScjFPIakjH2/tdIrIH8F+cc4v4SRo= github.com/huin/goupnp v1.0.1-0.20210310174557-0ca763054c88/go.mod h1:nNs7wvRfN1eKaMknBydLNQU6146XQim8t4h+q90biWo= +github.com/huin/goupnp v1.0.1-0.20210626160114-33cdcbb30dda h1:Vofqyy/Ysqit++X33unU0Gr08b6P35hKm3juytDrBVI= +github.com/huin/goupnp v1.0.1-0.20210626160114-33cdcbb30dda/go.mod h1:0dxJBVBHqTMjIUMkESDTNgOOx/Mw5wYIfyFmdzSamkM= github.com/huin/goutil v0.0.0-20170803182201-1ca381bf3150 h1:vlNjIqmUZ9CMAWsbURYl3a6wZbw7q5RHVvlXTNS/Bs8= github.com/huin/goutil v0.0.0-20170803182201-1ca381bf3150/go.mod h1:PpLOETDnJ0o3iZrZfqZzyLl6l7F3c6L1oWn7OICBi6o= github.com/ianlancetaylor/demangle v0.0.0-20181102032728-5e5cf60278f6 h1:UDMh68UUwekSh5iP2OMhRRZJiiBccgV7axzUG8vi56c= diff --git a/pkg/eth/api_test.go b/pkg/eth/api_test.go index 8fea4ea69..587a0c673 100644 --- a/pkg/eth/api_test.go +++ b/pkg/eth/api_test.go @@ -30,13 +30,12 @@ import ( "github.com/ethereum/go-ethereum/params" "github.com/ethereum/go-ethereum/rlp" "github.com/ethereum/go-ethereum/rpc" + "github.com/ethereum/go-ethereum/statediff/indexer" + "github.com/ethereum/go-ethereum/statediff/indexer/node" + "github.com/ethereum/go-ethereum/statediff/indexer/postgres" + "github.com/ethereum/go-ethereum/statediff/indexer/shared" . "github.com/onsi/ginkgo" . "github.com/onsi/gomega" - - eth2 "github.com/vulcanize/ipld-eth-indexer/pkg/eth" - "github.com/vulcanize/ipld-eth-indexer/pkg/postgres" - "github.com/vulcanize/ipld-eth-indexer/pkg/shared" - "github.com/vulcanize/ipld-eth-server/pkg/eth" "github.com/vulcanize/ipld-eth-server/pkg/eth/test_helpers" ) @@ -182,6 +181,18 @@ var ( } ) +// SetupDB is use to setup a db for watcher tests +func SetupDB() (*postgres.DB, error) { + uri := postgres.DbConnectionString(postgres.ConnectionParams{ + User: "vdbm", + Password: "password", + Hostname: "localhost", + Name: "vulcanize_testing", + Port: 8077, + }) + return postgres.NewDB(uri, postgres.ConnectionConfig{}, node.Info{}) +} + var _ = Describe("API", func() { var ( db *postgres.DB @@ -191,10 +202,14 @@ var _ = Describe("API", func() { // Test db setup, rather than using BeforeEach we only need to setup once since the tests do not mutate the database // Note: if you focus one of the tests be sure to focus this and the defered It() It("test init", func() { - var err error - db, err = shared.SetupDB() + var ( + err error + tx *indexer.BlockTx + ) + + db, err = SetupDB() Expect(err).ToNot(HaveOccurred()) - indexAndPublisher := eth2.NewIPLDPublisher(db) + indexAndPublisher := indexer.NewStateDiffIndexer(chainConfig, db) backend, err := eth.NewEthBackend(db, ð.Config{ ChainConfig: chainConfig, VmConfig: vm.Config{}, @@ -202,18 +217,29 @@ var _ = Describe("API", func() { }) Expect(err).ToNot(HaveOccurred()) api = eth.NewPublicEthAPI(backend, nil, false) - err = indexAndPublisher.Publish(test_helpers.MockConvertedPayload) + tx, err = indexAndPublisher.PushBlock(test_helpers.MockBlock, test_helpers.MockReceipts, test_helpers.MockBlock.Difficulty()) Expect(err).ToNot(HaveOccurred()) + for _, node := range test_helpers.MockStateNodes { + err = indexAndPublisher.PushStateNode(tx, node) + Expect(err).ToNot(HaveOccurred()) + } + + err = tx.Close(err) + Expect(err).ToNot(HaveOccurred()) + err = publishCode(db, test_helpers.ContractCodeHash, test_helpers.ContractCode) Expect(err).ToNot(HaveOccurred()) + uncles := test_helpers.MockBlock.Uncles() uncleHashes := make([]common.Hash, len(uncles)) for i, uncle := range uncles { uncleHashes[i] = uncle.Hash() } expectedBlock["uncles"] = uncleHashes + tx, err = indexAndPublisher.PushBlock(test_helpers.MockLondonBlock, test_helpers.MockLondonReceipts, test_helpers.MockLondonBlock.Difficulty()) + Expect(err).ToNot(HaveOccurred()) - err = indexAndPublisher.Publish(test_helpers.MockConvertedLondonPayload) + err = tx.Close(err) Expect(err).ToNot(HaveOccurred()) }) diff --git a/pkg/eth/backend.go b/pkg/eth/backend.go index 887cce677..264bbb94f 100644 --- a/pkg/eth/backend.go +++ b/pkg/eth/backend.go @@ -39,14 +39,12 @@ import ( "github.com/ethereum/go-ethereum/params" "github.com/ethereum/go-ethereum/rlp" "github.com/ethereum/go-ethereum/rpc" + "github.com/ethereum/go-ethereum/statediff/indexer/postgres" "github.com/ethereum/go-ethereum/trie" + "github.com/ethereum/go-ethereum/statediff/indexer/ipfs" + "github.com/ethereum/go-ethereum/statediff/indexer/shared" ipfsethdb "github.com/vulcanize/ipfs-ethdb" - "github.com/vulcanize/ipld-eth-indexer/pkg/ipfs" - "github.com/vulcanize/ipld-eth-indexer/pkg/postgres" - shared2 "github.com/vulcanize/ipld-eth-indexer/pkg/shared" - - "github.com/vulcanize/ipld-eth-server/pkg/shared" ) var ( @@ -714,7 +712,7 @@ func (b *Backend) GetCodeByHash(ctx context.Context, address common.Address, has return nil, err } var mhKey string - mhKey, err = shared2.MultihashKeyFromKeccak256(common.BytesToHash(codeHash)) + mhKey, err = shared.MultihashKeyFromKeccak256(common.BytesToHash(codeHash)) if err != nil { return nil, err } diff --git a/pkg/eth/backend_utils.go b/pkg/eth/backend_utils.go index 65ea905d9..66e9eac10 100644 --- a/pkg/eth/backend_utils.go +++ b/pkg/eth/backend_utils.go @@ -30,8 +30,7 @@ import ( "github.com/ethereum/go-ethereum/params" "github.com/ethereum/go-ethereum/rlp" "github.com/ethereum/go-ethereum/rpc" - - "github.com/vulcanize/ipld-eth-indexer/pkg/ipfs" + "github.com/ethereum/go-ethereum/statediff/indexer/ipfs" ) // RPCMarshalHeader converts the given header to the RPC output. diff --git a/pkg/eth/cid_retriever.go b/pkg/eth/cid_retriever.go index 49f1e1e78..d83ea6054 100644 --- a/pkg/eth/cid_retriever.go +++ b/pkg/eth/cid_retriever.go @@ -22,13 +22,12 @@ import ( "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/crypto" + "github.com/ethereum/go-ethereum/statediff/indexer/models" + "github.com/ethereum/go-ethereum/statediff/indexer/postgres" "github.com/jmoiron/sqlx" "github.com/lib/pq" log "github.com/sirupsen/logrus" - "github.com/vulcanize/ipld-eth-indexer/pkg/eth" - "github.com/vulcanize/ipld-eth-indexer/pkg/postgres" - "github.com/vulcanize/ipld-eth-server/pkg/shared" ) @@ -86,7 +85,7 @@ func (ecr *CIDRetriever) Retrieve(filter SubscriptionSettings, blockNumber int64 }() // Retrieve cached header CIDs at this block height - var headers []eth.HeaderModel + var headers []models.HeaderModel headers, err = ecr.RetrieveHeaderCIDs(tx, blockNumber) if err != nil { log.Error("header cid retrieval error") @@ -102,7 +101,7 @@ func (ecr *CIDRetriever) Retrieve(filter SubscriptionSettings, blockNumber int64 empty = false if filter.HeaderFilter.Uncles { // Retrieve uncle cids for this header id - var uncleCIDs []eth.UncleModel + var uncleCIDs []models.UncleModel uncleCIDs, err = ecr.RetrieveUncleCIDsByHeaderID(tx, header.ID) if err != nil { log.Error("uncle cid retrieval error") @@ -166,18 +165,18 @@ func (ecr *CIDRetriever) Retrieve(filter SubscriptionSettings, blockNumber int64 } // RetrieveHeaderCIDs retrieves and returns all of the header cids at the provided blockheight -func (ecr *CIDRetriever) RetrieveHeaderCIDs(tx *sqlx.Tx, blockNumber int64) ([]eth.HeaderModel, error) { +func (ecr *CIDRetriever) RetrieveHeaderCIDs(tx *sqlx.Tx, blockNumber int64) ([]models.HeaderModel, error) { log.Debug("retrieving header cids for block ", blockNumber) - headers := make([]eth.HeaderModel, 0) + headers := make([]models.HeaderModel, 0) pgStr := `SELECT * FROM eth.header_cids WHERE block_number = $1` return headers, tx.Select(&headers, pgStr, blockNumber) } // RetrieveUncleCIDsByHeaderID retrieves and returns all of the uncle cids for the provided header -func (ecr *CIDRetriever) RetrieveUncleCIDsByHeaderID(tx *sqlx.Tx, headerID int64) ([]eth.UncleModel, error) { +func (ecr *CIDRetriever) RetrieveUncleCIDsByHeaderID(tx *sqlx.Tx, headerID int64) ([]models.UncleModel, error) { log.Debug("retrieving uncle cids for block id ", headerID) - headers := make([]eth.UncleModel, 0) + headers := make([]models.UncleModel, 0) pgStr := `SELECT * FROM eth.uncle_cids WHERE header_id = $1` return headers, tx.Select(&headers, pgStr, headerID) @@ -185,10 +184,10 @@ func (ecr *CIDRetriever) RetrieveUncleCIDsByHeaderID(tx *sqlx.Tx, headerID int64 // RetrieveTxCIDs retrieves and returns all of the trx cids at the provided blockheight that conform to the provided filter parameters // also returns the ids for the returned transaction cids -func (ecr *CIDRetriever) RetrieveTxCIDs(tx *sqlx.Tx, txFilter TxFilter, headerID int64) ([]eth.TxModel, error) { +func (ecr *CIDRetriever) RetrieveTxCIDs(tx *sqlx.Tx, txFilter TxFilter, headerID int64) ([]models.TxModel, error) { log.Debug("retrieving transaction cids for header id ", headerID) args := make([]interface{}, 0, 3) - results := make([]eth.TxModel, 0) + results := make([]models.TxModel, 0) id := 1 pgStr := fmt.Sprintf(`SELECT transaction_cids.id, transaction_cids.header_id, transaction_cids.tx_hash, transaction_cids.cid, transaction_cids.mh_key, @@ -212,7 +211,7 @@ func (ecr *CIDRetriever) RetrieveTxCIDs(tx *sqlx.Tx, txFilter TxFilter, headerID // RetrieveRctCIDsByHeaderID retrieves and returns all of the rct cids at the provided header ID that conform to the provided // filter parameters and correspond to the provided tx ids -func (ecr *CIDRetriever) RetrieveRctCIDsByHeaderID(tx *sqlx.Tx, rctFilter ReceiptFilter, headerID int64, trxIds []int64) ([]eth.ReceiptModel, error) { +func (ecr *CIDRetriever) RetrieveRctCIDsByHeaderID(tx *sqlx.Tx, rctFilter ReceiptFilter, headerID int64, trxIds []int64) ([]models.ReceiptModel, error) { log.Debug("retrieving receipt cids for header id ", headerID) args := make([]interface{}, 0, 4) pgStr := `SELECT receipt_cids.id, receipt_cids.tx_id, receipt_cids.cid, receipt_cids.mh_key, @@ -286,13 +285,13 @@ func (ecr *CIDRetriever) RetrieveRctCIDsByHeaderID(tx *sqlx.Tx, rctFilter Receip } } pgStr += ` ORDER BY transaction_cids.index` - receiptCids := make([]eth.ReceiptModel, 0) + receiptCids := make([]models.ReceiptModel, 0) return receiptCids, tx.Select(&receiptCids, pgStr, args...) } // RetrieveRctCIDs retrieves and returns all of the rct cids at the provided blockheight or block hash that conform to the provided // filter parameters and correspond to the provided tx ids -func (ecr *CIDRetriever) RetrieveRctCIDs(tx *sqlx.Tx, rctFilter ReceiptFilter, blockNumber int64, blockHash *common.Hash, trxIds []int64) ([]eth.ReceiptModel, error) { +func (ecr *CIDRetriever) RetrieveRctCIDs(tx *sqlx.Tx, rctFilter ReceiptFilter, blockNumber int64, blockHash *common.Hash, trxIds []int64) ([]models.ReceiptModel, error) { log.Debug("retrieving receipt cids for block ", blockNumber) args := make([]interface{}, 0, 5) pgStr := `SELECT receipt_cids.id, receipt_cids.tx_id, receipt_cids.cid, receipt_cids.mh_key, @@ -316,7 +315,7 @@ func (ecr *CIDRetriever) RetrieveRctCIDs(tx *sqlx.Tx, rctFilter ReceiptFilter, b // TODO: Add the below filters when we have log index in DB. if true { pgStr += ` ORDER BY transaction_cids.index` - receiptCids := make([]eth.ReceiptModel, 0) + receiptCids := make([]models.ReceiptModel, 0) return receiptCids, tx.Select(&receiptCids, pgStr, args...) } @@ -383,7 +382,7 @@ func (ecr *CIDRetriever) RetrieveRctCIDs(tx *sqlx.Tx, rctFilter ReceiptFilter, b } pgStr += ` ORDER BY transaction_cids.index` - receiptCids := make([]eth.ReceiptModel, 0) + receiptCids := make([]models.ReceiptModel, 0) return receiptCids, tx.Select(&receiptCids, pgStr, args...) } @@ -397,7 +396,7 @@ func hasTopics(topics [][]string) bool { } // RetrieveStateCIDs retrieves and returns all of the state node cids at the provided header ID that conform to the provided filter parameters -func (ecr *CIDRetriever) RetrieveStateCIDs(tx *sqlx.Tx, stateFilter StateFilter, headerID int64) ([]eth.StateNodeModel, error) { +func (ecr *CIDRetriever) RetrieveStateCIDs(tx *sqlx.Tx, stateFilter StateFilter, headerID int64) ([]models.StateNodeModel, error) { log.Debug("retrieving state cids for header id ", headerID) args := make([]interface{}, 0, 2) pgStr := `SELECT state_cids.id, state_cids.header_id, @@ -417,12 +416,12 @@ func (ecr *CIDRetriever) RetrieveStateCIDs(tx *sqlx.Tx, stateFilter StateFilter, if !stateFilter.IntermediateNodes { pgStr += ` AND state_cids.node_type = 2` } - stateNodeCIDs := make([]eth.StateNodeModel, 0) + stateNodeCIDs := make([]models.StateNodeModel, 0) return stateNodeCIDs, tx.Select(&stateNodeCIDs, pgStr, args...) } // RetrieveStorageCIDs retrieves and returns all of the storage node cids at the provided header id that conform to the provided filter parameters -func (ecr *CIDRetriever) RetrieveStorageCIDs(tx *sqlx.Tx, storageFilter StorageFilter, headerID int64) ([]eth.StorageNodeWithStateKeyModel, error) { +func (ecr *CIDRetriever) RetrieveStorageCIDs(tx *sqlx.Tx, storageFilter StorageFilter, headerID int64) ([]models.StorageNodeWithStateKeyModel, error) { log.Debug("retrieving storage cids for header id ", headerID) args := make([]interface{}, 0, 3) pgStr := `SELECT storage_cids.id, storage_cids.state_id, storage_cids.storage_leaf_key, storage_cids.node_type, @@ -450,18 +449,18 @@ func (ecr *CIDRetriever) RetrieveStorageCIDs(tx *sqlx.Tx, storageFilter StorageF if !storageFilter.IntermediateNodes { pgStr += ` AND storage_cids.node_type = 2` } - storageNodeCIDs := make([]eth.StorageNodeWithStateKeyModel, 0) + storageNodeCIDs := make([]models.StorageNodeWithStateKeyModel, 0) return storageNodeCIDs, tx.Select(&storageNodeCIDs, pgStr, args...) } // RetrieveBlockByHash returns all of the CIDs needed to compose an entire block, for a given block hash -func (ecr *CIDRetriever) RetrieveBlockByHash(blockHash common.Hash) (eth.HeaderModel, []eth.UncleModel, []eth.TxModel, []eth.ReceiptModel, error) { +func (ecr *CIDRetriever) RetrieveBlockByHash(blockHash common.Hash) (models.HeaderModel, []models.UncleModel, []models.TxModel, []models.ReceiptModel, error) { log.Debug("retrieving block cids for block hash ", blockHash.String()) // Begin new db tx tx, err := ecr.db.Beginx() if err != nil { - return eth.HeaderModel{}, nil, nil, nil, err + return models.HeaderModel{}, nil, nil, nil, err } defer func() { if p := recover(); p != nil { @@ -474,29 +473,29 @@ func (ecr *CIDRetriever) RetrieveBlockByHash(blockHash common.Hash) (eth.HeaderM } }() - var headerCID eth.HeaderModel + var headerCID models.HeaderModel headerCID, err = ecr.RetrieveHeaderCIDByHash(tx, blockHash) if err != nil { log.Error("header cid retrieval error") - return eth.HeaderModel{}, nil, nil, nil, err + return models.HeaderModel{}, nil, nil, nil, err } - var uncleCIDs []eth.UncleModel + var uncleCIDs []models.UncleModel uncleCIDs, err = ecr.RetrieveUncleCIDsByHeaderID(tx, headerCID.ID) if err != nil { log.Error("uncle cid retrieval error") - return eth.HeaderModel{}, nil, nil, nil, err + return models.HeaderModel{}, nil, nil, nil, err } - var txCIDs []eth.TxModel + var txCIDs []models.TxModel txCIDs, err = ecr.RetrieveTxCIDsByHeaderID(tx, headerCID.ID) if err != nil { log.Error("tx cid retrieval error") - return eth.HeaderModel{}, nil, nil, nil, err + return models.HeaderModel{}, nil, nil, nil, err } txIDs := make([]int64, len(txCIDs)) for i, txCID := range txCIDs { txIDs[i] = txCID.ID } - var rctCIDs []eth.ReceiptModel + var rctCIDs []models.ReceiptModel rctCIDs, err = ecr.RetrieveReceiptCIDsByTxIDs(tx, txIDs) if err != nil { log.Error("rct cid retrieval error") @@ -505,13 +504,13 @@ func (ecr *CIDRetriever) RetrieveBlockByHash(blockHash common.Hash) (eth.HeaderM } // RetrieveBlockByNumber returns all of the CIDs needed to compose an entire block, for a given block number -func (ecr *CIDRetriever) RetrieveBlockByNumber(blockNumber int64) (eth.HeaderModel, []eth.UncleModel, []eth.TxModel, []eth.ReceiptModel, error) { +func (ecr *CIDRetriever) RetrieveBlockByNumber(blockNumber int64) (models.HeaderModel, []models.UncleModel, []models.TxModel, []models.ReceiptModel, error) { log.Debug("retrieving block cids for block number ", blockNumber) // Begin new db tx tx, err := ecr.db.Beginx() if err != nil { - return eth.HeaderModel{}, nil, nil, nil, err + return models.HeaderModel{}, nil, nil, nil, err } defer func() { if p := recover(); p != nil { @@ -524,32 +523,32 @@ func (ecr *CIDRetriever) RetrieveBlockByNumber(blockNumber int64) (eth.HeaderMod } }() - var headerCID []eth.HeaderModel + var headerCID []models.HeaderModel headerCID, err = ecr.RetrieveHeaderCIDs(tx, blockNumber) if err != nil { log.Error("header cid retrieval error") - return eth.HeaderModel{}, nil, nil, nil, err + return models.HeaderModel{}, nil, nil, nil, err } if len(headerCID) < 1 { - return eth.HeaderModel{}, nil, nil, nil, fmt.Errorf("header cid retrieval error, no header CIDs found at block %d", blockNumber) + return models.HeaderModel{}, nil, nil, nil, fmt.Errorf("header cid retrieval error, no header CIDs found at block %d", blockNumber) } - var uncleCIDs []eth.UncleModel + var uncleCIDs []models.UncleModel uncleCIDs, err = ecr.RetrieveUncleCIDsByHeaderID(tx, headerCID[0].ID) if err != nil { log.Error("uncle cid retrieval error") - return eth.HeaderModel{}, nil, nil, nil, err + return models.HeaderModel{}, nil, nil, nil, err } - var txCIDs []eth.TxModel + var txCIDs []models.TxModel txCIDs, err = ecr.RetrieveTxCIDsByHeaderID(tx, headerCID[0].ID) if err != nil { log.Error("tx cid retrieval error") - return eth.HeaderModel{}, nil, nil, nil, err + return models.HeaderModel{}, nil, nil, nil, err } txIDs := make([]int64, len(txCIDs)) for i, txCID := range txCIDs { txIDs[i] = txCID.ID } - var rctCIDs []eth.ReceiptModel + var rctCIDs []models.ReceiptModel rctCIDs, err = ecr.RetrieveReceiptCIDsByTxIDs(tx, txIDs) if err != nil { log.Error("rct cid retrieval error") @@ -558,26 +557,26 @@ func (ecr *CIDRetriever) RetrieveBlockByNumber(blockNumber int64) (eth.HeaderMod } // RetrieveHeaderCIDByHash returns the header for the given block hash -func (ecr *CIDRetriever) RetrieveHeaderCIDByHash(tx *sqlx.Tx, blockHash common.Hash) (eth.HeaderModel, error) { +func (ecr *CIDRetriever) RetrieveHeaderCIDByHash(tx *sqlx.Tx, blockHash common.Hash) (models.HeaderModel, error) { log.Debug("retrieving header cids for block hash ", blockHash.String()) pgStr := `SELECT * FROM eth.header_cids WHERE block_hash = $1` - var headerCID eth.HeaderModel + var headerCID models.HeaderModel return headerCID, tx.Get(&headerCID, pgStr, blockHash.String()) } // RetrieveTxCIDsByHeaderID retrieves all tx CIDs for the given header id -func (ecr *CIDRetriever) RetrieveTxCIDsByHeaderID(tx *sqlx.Tx, headerID int64) ([]eth.TxModel, error) { +func (ecr *CIDRetriever) RetrieveTxCIDsByHeaderID(tx *sqlx.Tx, headerID int64) ([]models.TxModel, error) { log.Debug("retrieving tx cids for block id ", headerID) pgStr := `SELECT * FROM eth.transaction_cids WHERE header_id = $1 ORDER BY index` - var txCIDs []eth.TxModel + var txCIDs []models.TxModel return txCIDs, tx.Select(&txCIDs, pgStr, headerID) } // RetrieveReceiptCIDsByTxIDs retrieves receipt CIDs by their associated tx IDs -func (ecr *CIDRetriever) RetrieveReceiptCIDsByTxIDs(tx *sqlx.Tx, txIDs []int64) ([]eth.ReceiptModel, error) { +func (ecr *CIDRetriever) RetrieveReceiptCIDsByTxIDs(tx *sqlx.Tx, txIDs []int64) ([]models.ReceiptModel, error) { log.Debugf("retrieving receipt cids for tx ids %v", txIDs) pgStr := `SELECT receipt_cids.id, receipt_cids.tx_id, receipt_cids.cid, receipt_cids.mh_key, receipt_cids.contract, receipt_cids.contract_hash, receipt_cids.topic0s, receipt_cids.topic1s, @@ -586,6 +585,6 @@ func (ecr *CIDRetriever) RetrieveReceiptCIDsByTxIDs(tx *sqlx.Tx, txIDs []int64) WHERE tx_id = ANY($1::INTEGER[]) AND receipt_cids.tx_id = transaction_cids.id ORDER BY transaction_cids.index` - var rctCIDs []eth.ReceiptModel + var rctCIDs []models.ReceiptModel return rctCIDs, tx.Select(&rctCIDs, pgStr, pq.Array(txIDs)) } diff --git a/pkg/eth/cid_retriever_test.go b/pkg/eth/cid_retriever_test.go index db6a426c1..414dbd475 100644 --- a/pkg/eth/cid_retriever_test.go +++ b/pkg/eth/cid_retriever_test.go @@ -19,17 +19,17 @@ package eth_test import ( "math/big" + "github.com/ethereum/go-ethereum/params" + "github.com/ethereum/go-ethereum/statediff/indexer" + "github.com/ethereum/go-ethereum/statediff/indexer/models" + "github.com/ethereum/go-ethereum/statediff/indexer/postgres" "github.com/ethereum/go-ethereum/trie" - "github.com/vulcanize/ipld-eth-indexer/pkg/shared" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core/types" . "github.com/onsi/ginkgo" . "github.com/onsi/gomega" - eth2 "github.com/vulcanize/ipld-eth-indexer/pkg/eth" - "github.com/vulcanize/ipld-eth-indexer/pkg/postgres" - "github.com/vulcanize/ipld-eth-server/pkg/eth" "github.com/vulcanize/ipld-eth-server/pkg/eth/test_helpers" ) @@ -211,15 +211,15 @@ var ( var _ = Describe("Retriever", func() { var ( - db *postgres.DB - repo *eth2.IPLDPublisher - retriever *eth.CIDRetriever + db *postgres.DB + diffIndexer *indexer.StateDiffIndexer + retriever *eth.CIDRetriever ) BeforeEach(func() { var err error - db, err = shared.SetupDB() + db, err = SetupDB() Expect(err).ToNot(HaveOccurred()) - repo = eth2.NewIPLDPublisher(db) + diffIndexer = indexer.NewStateDiffIndexer(params.TestChainConfig, db) retriever = eth.NewCIDRetriever(db) }) AfterEach(func() { @@ -228,7 +228,14 @@ var _ = Describe("Retriever", func() { Describe("Retrieve", func() { BeforeEach(func() { - err := repo.Publish(test_helpers.MockConvertedPayload) + tx, err := diffIndexer.PushBlock(test_helpers.MockBlock, test_helpers.MockReceipts, test_helpers.MockBlock.Difficulty()) + Expect(err).ToNot(HaveOccurred()) + for _, node := range test_helpers.MockStateNodes { + err = diffIndexer.PushStateNode(tx, node) + Expect(err).ToNot(HaveOccurred()) + } + + err = tx.Close(err) Expect(err).ToNot(HaveOccurred()) }) It("Retrieves all CIDs for the given blocknumber when provided an open filter", func() { @@ -277,7 +284,7 @@ var _ = Describe("Retriever", func() { Expect(empty).ToNot(BeTrue()) Expect(len(cids1)).To(Equal(1)) Expect(cids1[0].BlockNumber).To(Equal(test_helpers.MockCIDWrapper.BlockNumber)) - Expect(cids1[0].Header).To(Equal(eth2.HeaderModel{})) + Expect(cids1[0].Header).To(Equal(models.HeaderModel{})) Expect(len(cids1[0].Transactions)).To(Equal(0)) Expect(len(cids1[0].StateNodes)).To(Equal(0)) Expect(len(cids1[0].StorageNodes)).To(Equal(0)) @@ -292,7 +299,7 @@ var _ = Describe("Retriever", func() { Expect(empty).ToNot(BeTrue()) Expect(len(cids2)).To(Equal(1)) Expect(cids2[0].BlockNumber).To(Equal(test_helpers.MockCIDWrapper.BlockNumber)) - Expect(cids2[0].Header).To(Equal(eth2.HeaderModel{})) + Expect(cids2[0].Header).To(Equal(models.HeaderModel{})) Expect(len(cids2[0].Transactions)).To(Equal(0)) Expect(len(cids2[0].StateNodes)).To(Equal(0)) Expect(len(cids2[0].StorageNodes)).To(Equal(0)) @@ -307,7 +314,7 @@ var _ = Describe("Retriever", func() { Expect(empty).ToNot(BeTrue()) Expect(len(cids3)).To(Equal(1)) Expect(cids3[0].BlockNumber).To(Equal(test_helpers.MockCIDWrapper.BlockNumber)) - Expect(cids3[0].Header).To(Equal(eth2.HeaderModel{})) + Expect(cids3[0].Header).To(Equal(models.HeaderModel{})) Expect(len(cids3[0].Transactions)).To(Equal(0)) Expect(len(cids3[0].StateNodes)).To(Equal(0)) Expect(len(cids3[0].StorageNodes)).To(Equal(0)) @@ -322,7 +329,7 @@ var _ = Describe("Retriever", func() { Expect(empty).ToNot(BeTrue()) Expect(len(cids4)).To(Equal(1)) Expect(cids4[0].BlockNumber).To(Equal(test_helpers.MockCIDWrapper.BlockNumber)) - Expect(cids4[0].Header).To(Equal(eth2.HeaderModel{})) + Expect(cids4[0].Header).To(Equal(models.HeaderModel{})) Expect(len(cids4[0].Transactions)).To(Equal(0)) Expect(len(cids4[0].StateNodes)).To(Equal(0)) Expect(len(cids4[0].StorageNodes)).To(Equal(0)) @@ -337,7 +344,7 @@ var _ = Describe("Retriever", func() { Expect(empty).ToNot(BeTrue()) Expect(len(cids5)).To(Equal(1)) Expect(cids5[0].BlockNumber).To(Equal(test_helpers.MockCIDWrapper.BlockNumber)) - Expect(cids5[0].Header).To(Equal(eth2.HeaderModel{})) + Expect(cids5[0].Header).To(Equal(models.HeaderModel{})) Expect(len(cids5[0].Transactions)).To(Equal(3)) Expect(eth.TxModelsContainsCID(cids5[0].Transactions, test_helpers.Trx1CID.String())).To(BeTrue()) Expect(eth.TxModelsContainsCID(cids5[0].Transactions, test_helpers.Trx2CID.String())).To(BeTrue()) @@ -354,7 +361,7 @@ var _ = Describe("Retriever", func() { Expect(empty).ToNot(BeTrue()) Expect(len(cids6)).To(Equal(1)) Expect(cids6[0].BlockNumber).To(Equal(test_helpers.MockCIDWrapper.BlockNumber)) - Expect(cids6[0].Header).To(Equal(eth2.HeaderModel{})) + Expect(cids6[0].Header).To(Equal(models.HeaderModel{})) Expect(len(cids6[0].Transactions)).To(Equal(1)) expectedTxCID := test_helpers.MockCIDWrapper.Transactions[1] expectedTxCID.ID = cids6[0].Transactions[0].ID @@ -373,12 +380,12 @@ var _ = Describe("Retriever", func() { Expect(empty).ToNot(BeTrue()) Expect(len(cids7)).To(Equal(1)) Expect(cids7[0].BlockNumber).To(Equal(test_helpers.MockCIDWrapper.BlockNumber)) - Expect(cids7[0].Header).To(Equal(eth2.HeaderModel{})) + Expect(cids7[0].Header).To(Equal(models.HeaderModel{})) Expect(len(cids7[0].Transactions)).To(Equal(0)) Expect(len(cids7[0].Receipts)).To(Equal(0)) Expect(len(cids7[0].StorageNodes)).To(Equal(0)) Expect(len(cids7[0].StateNodes)).To(Equal(1)) - Expect(cids7[0].StateNodes[0]).To(Equal(eth2.StateNodeModel{ + Expect(cids7[0].StateNodes[0]).To(Equal(models.StateNodeModel{ ID: cids7[0].StateNodes[0].ID, HeaderID: cids7[0].StateNodes[0].HeaderID, NodeType: 2, @@ -400,8 +407,12 @@ var _ = Describe("Retriever", func() { Expect(err).To(HaveOccurred()) }) It("Gets the number of the first block that has data in the database", func() { - err := repo.Publish(test_helpers.MockConvertedPayload) + tx, err := diffIndexer.PushBlock(test_helpers.MockBlock, test_helpers.MockReceipts, test_helpers.MockBlock.Difficulty()) + Expect(err).ToNot(HaveOccurred()) + + err = tx.Close(err) Expect(err).ToNot(HaveOccurred()) + num, err := retriever.RetrieveFirstBlockNumber() Expect(err).ToNot(HaveOccurred()) Expect(num).To(Equal(int64(1))) @@ -410,8 +421,12 @@ var _ = Describe("Retriever", func() { It("Gets the number of the first block that has data in the database", func() { payload := test_helpers.MockConvertedPayload payload.Block = newMockBlock(1010101) - err := repo.Publish(payload) + tx, err := diffIndexer.PushBlock(payload.Block, payload.Receipts, payload.Block.Difficulty()) + Expect(err).ToNot(HaveOccurred()) + + err = tx.Close(err) Expect(err).ToNot(HaveOccurred()) + num, err := retriever.RetrieveFirstBlockNumber() Expect(err).ToNot(HaveOccurred()) Expect(num).To(Equal(int64(1010101))) @@ -422,10 +437,16 @@ var _ = Describe("Retriever", func() { payload1.Block = newMockBlock(1010101) payload2 := payload1 payload2.Block = newMockBlock(5) - err := repo.Publish(payload1) + tx, err := diffIndexer.PushBlock(payload1.Block, payload1.Receipts, payload1.Block.Difficulty()) + Expect(err).ToNot(HaveOccurred()) + err = tx.Close(err) Expect(err).ToNot(HaveOccurred()) - err = repo.Publish(payload2) + + tx, err = diffIndexer.PushBlock(payload2.Block, payload2.Receipts, payload2.Block.Difficulty()) Expect(err).ToNot(HaveOccurred()) + err = tx.Close(err) + Expect(err).ToNot(HaveOccurred()) + num, err := retriever.RetrieveFirstBlockNumber() Expect(err).ToNot(HaveOccurred()) Expect(num).To(Equal(int64(5))) @@ -438,8 +459,11 @@ var _ = Describe("Retriever", func() { Expect(err).To(HaveOccurred()) }) It("Gets the number of the latest block that has data in the database", func() { - err := repo.Publish(test_helpers.MockConvertedPayload) + tx, err := diffIndexer.PushBlock(test_helpers.MockBlock, test_helpers.MockReceipts, test_helpers.MockBlock.Difficulty()) + Expect(err).ToNot(HaveOccurred()) + err = tx.Close(err) Expect(err).ToNot(HaveOccurred()) + num, err := retriever.RetrieveLastBlockNumber() Expect(err).ToNot(HaveOccurred()) Expect(num).To(Equal(int64(1))) @@ -448,8 +472,12 @@ var _ = Describe("Retriever", func() { It("Gets the number of the latest block that has data in the database", func() { payload := test_helpers.MockConvertedPayload payload.Block = newMockBlock(1010101) - err := repo.Publish(payload) + tx, err := diffIndexer.PushBlock(payload.Block, payload.Receipts, payload.Block.Difficulty()) + Expect(err).ToNot(HaveOccurred()) + + err = tx.Close(err) Expect(err).ToNot(HaveOccurred()) + num, err := retriever.RetrieveLastBlockNumber() Expect(err).ToNot(HaveOccurred()) Expect(num).To(Equal(int64(1010101))) @@ -460,10 +488,16 @@ var _ = Describe("Retriever", func() { payload1.Block = newMockBlock(1010101) payload2 := payload1 payload2.Block = newMockBlock(5) - err := repo.Publish(payload1) + tx, err := diffIndexer.PushBlock(payload1.Block, payload1.Receipts, payload1.Block.Difficulty()) + Expect(err).ToNot(HaveOccurred()) + err = tx.Close(err) Expect(err).ToNot(HaveOccurred()) - err = repo.Publish(payload2) + + tx, err = diffIndexer.PushBlock(payload2.Block, payload2.Receipts, payload2.Block.Difficulty()) Expect(err).ToNot(HaveOccurred()) + err = tx.Close(err) + Expect(err).ToNot(HaveOccurred()) + num, err := retriever.RetrieveLastBlockNumber() Expect(err).ToNot(HaveOccurred()) Expect(num).To(Equal(int64(1010101))) diff --git a/pkg/eth/eth_state_test.go b/pkg/eth/eth_state_test.go index d824be19c..c7394aaf7 100644 --- a/pkg/eth/eth_state_test.go +++ b/pkg/eth/eth_state_test.go @@ -29,15 +29,14 @@ import ( "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/core/vm" "github.com/ethereum/go-ethereum/params" - "github.com/ethereum/go-ethereum/rlp" "github.com/ethereum/go-ethereum/rpc" "github.com/ethereum/go-ethereum/statediff" + "github.com/ethereum/go-ethereum/statediff/indexer" + "github.com/ethereum/go-ethereum/statediff/indexer/postgres" + sdtypes "github.com/ethereum/go-ethereum/statediff/types" . "github.com/onsi/ginkgo" . "github.com/onsi/gomega" - eth2 "github.com/vulcanize/ipld-eth-indexer/pkg/eth" - "github.com/vulcanize/ipld-eth-indexer/pkg/postgres" - "github.com/vulcanize/ipld-eth-indexer/pkg/shared" "github.com/vulcanize/ipld-eth-server/pkg/eth" "github.com/vulcanize/ipld-eth-server/pkg/eth/test_helpers" ) @@ -74,9 +73,9 @@ var _ = Describe("eth state reading tests", func() { It("test init", func() { // db and type initializations var err error - db, err = shared.SetupDB() + db, err = SetupDB() Expect(err).ToNot(HaveOccurred()) - transformer := eth2.NewStateDiffTransformer(chainConfig, db) + transformer := indexer.NewStateDiffIndexer(chainConfig, db) backend, err = eth.NewEthBackend(db, ð.Config{ ChainConfig: chainConfig, VmConfig: vm.Config{}, @@ -135,31 +134,39 @@ var _ = Describe("eth state reading tests", func() { } diff, err := builder.BuildStateDiffObject(args, params) Expect(err).ToNot(HaveOccurred()) - diffRlp, err := rlp.EncodeToBytes(diff) + tx, err := transformer.PushBlock(block, rcts, mockTD) Expect(err).ToNot(HaveOccurred()) - blockRlp, err := rlp.EncodeToBytes(block) - Expect(err).ToNot(HaveOccurred()) - receiptsRlp, err := rlp.EncodeToBytes(rcts) - Expect(err).ToNot(HaveOccurred()) - payload := statediff.Payload{ - StateObjectRlp: diffRlp, - BlockRlp: blockRlp, - ReceiptsRlp: receiptsRlp, - TotalDifficulty: mockTD, + + for _, node := range diff.Nodes { + err = transformer.PushStateNode(tx, node) + Expect(err).ToNot(HaveOccurred()) } - _, err = transformer.Transform(0, payload) + err = tx.Close(err) Expect(err).ToNot(HaveOccurred()) } // Insert some non-canonical data into the database so that we test our ability to discern canonicity - indexAndPublisher := eth2.NewIPLDPublisher(db) - api = eth.NewPublicEthAPI(backend, nil, false) - err = indexAndPublisher.Publish(test_helpers.MockConvertedPayload) + indexAndPublisher := indexer.NewStateDiffIndexer(chainConfig, db) + + tx, err := indexAndPublisher.PushBlock(test_helpers.MockBlock, test_helpers.MockReceipts, test_helpers.MockBlock.Difficulty()) + Expect(err).ToNot(HaveOccurred()) + + err = tx.Close(err) Expect(err).ToNot(HaveOccurred()) + // The non-canonical header has a child - err = indexAndPublisher.Publish(test_helpers.MockConvertedPayloadForChild) + tx, err = indexAndPublisher.PushBlock(test_helpers.MockChild, test_helpers.MockReceipts, test_helpers.MockChild.Difficulty()) + Expect(err).ToNot(HaveOccurred()) + + hash := sdtypes.CodeAndCodeHash{ + Hash: test_helpers.CodeHash, + Code: test_helpers.ContractCode, + } + + err = indexAndPublisher.PushCodeAndCodeHash(tx, hash) Expect(err).ToNot(HaveOccurred()) - err = publishCode(db, test_helpers.ContractCodeHash, test_helpers.ContractCode) + + err = tx.Close(err) Expect(err).ToNot(HaveOccurred()) }) defer It("test teardown", func() { diff --git a/pkg/eth/filterer.go b/pkg/eth/filterer.go index 2c920bcba..6bea92c53 100644 --- a/pkg/eth/filterer.go +++ b/pkg/eth/filterer.go @@ -19,6 +19,7 @@ package eth import ( "bytes" + "github.com/ethereum/go-ethereum/statediff/indexer/ipfs/ipld" sdtypes "github.com/ethereum/go-ethereum/statediff/types" "github.com/ethereum/go-ethereum/common" @@ -27,14 +28,12 @@ import ( "github.com/ethereum/go-ethereum/rlp" "github.com/multiformats/go-multihash" - "github.com/vulcanize/ipld-eth-indexer/pkg/eth" - "github.com/vulcanize/ipld-eth-indexer/pkg/ipfs" - "github.com/vulcanize/ipld-eth-indexer/pkg/ipfs/ipld" + "github.com/ethereum/go-ethereum/statediff/indexer/ipfs" ) // Filterer interface for substituing mocks in tests type Filterer interface { - Filter(filter SubscriptionSettings, payload eth.ConvertedPayload) (*IPLDs, error) + Filter(filter SubscriptionSettings, payload ConvertedPayload) (*IPLDs, error) } // ResponseFilterer satisfies the ResponseFilterer interface for ethereum @@ -46,7 +45,7 @@ func NewResponseFilterer() *ResponseFilterer { } // Filter is used to filter through eth data to extract and package requested data into a Payload -func (s *ResponseFilterer) Filter(filter SubscriptionSettings, payload eth.ConvertedPayload) (*IPLDs, error) { +func (s *ResponseFilterer) Filter(filter SubscriptionSettings, payload ConvertedPayload) (*IPLDs, error) { if checkRange(filter.Start.Int64(), filter.End.Int64(), payload.Block.Number().Int64()) { response := new(IPLDs) response.TotalDifficulty = payload.TotalDifficulty @@ -73,7 +72,7 @@ func (s *ResponseFilterer) Filter(filter SubscriptionSettings, payload eth.Conve return nil, nil } -func (s *ResponseFilterer) filterHeaders(headerFilter HeaderFilter, response *IPLDs, payload eth.ConvertedPayload) error { +func (s *ResponseFilterer) filterHeaders(headerFilter HeaderFilter, response *IPLDs, payload ConvertedPayload) error { if !headerFilter.Off { headerRLP, err := rlp.EncodeToBytes(payload.Block.Header()) if err != nil { @@ -115,7 +114,7 @@ func checkRange(start, end, actual int64) bool { return false } -func (s *ResponseFilterer) filterTransactions(trxFilter TxFilter, response *IPLDs, payload eth.ConvertedPayload) ([]common.Hash, error) { +func (s *ResponseFilterer) filterTransactions(trxFilter TxFilter, response *IPLDs, payload ConvertedPayload) ([]common.Hash, error) { var trxHashes []common.Hash if !trxFilter.Off { trxLen := len(payload.Block.Body().Transactions) @@ -163,7 +162,7 @@ func checkTransactionAddrs(wantedSrc, wantedDst []string, actualSrc, actualDst s return false } -func (s *ResponseFilterer) filerReceipts(receiptFilter ReceiptFilter, response *IPLDs, payload eth.ConvertedPayload, trxHashes []common.Hash) error { +func (s *ResponseFilterer) filerReceipts(receiptFilter ReceiptFilter, response *IPLDs, payload ConvertedPayload, trxHashes []common.Hash) error { if !receiptFilter.Off { response.Receipts = make([]ipfs.BlockModel, 0, len(payload.Receipts)) for i, receipt := range payload.Receipts { @@ -253,7 +252,7 @@ func slicesShareString(slice1, slice2 []string) int { } // filterStateAndStorage filters state and storage nodes into the response according to the provided filters -func (s *ResponseFilterer) filterStateAndStorage(stateFilter StateFilter, storageFilter StorageFilter, response *IPLDs, payload eth.ConvertedPayload) error { +func (s *ResponseFilterer) filterStateAndStorage(stateFilter StateFilter, storageFilter StorageFilter, response *IPLDs, payload ConvertedPayload) error { response.StateNodes = make([]StateNode, 0, len(payload.StateNodes)) response.StorageNodes = make([]StorageNode, 0) stateAddressFilters := make([]common.Hash, len(stateFilter.Addresses)) @@ -270,37 +269,37 @@ func (s *ResponseFilterer) filterStateAndStorage(stateFilter StateFilter, storag } for _, stateNode := range payload.StateNodes { if !stateFilter.Off && checkNodeKeys(stateAddressFilters, stateNode.LeafKey) { - if stateNode.Type == sdtypes.Leaf || stateFilter.IntermediateNodes { - cid, err := ipld.RawdataToCid(ipld.MEthStateTrie, stateNode.Value, multihash.KECCAK_256) + if stateNode.NodeType == sdtypes.Leaf || stateFilter.IntermediateNodes { + cid, err := ipld.RawdataToCid(ipld.MEthStateTrie, stateNode.NodeValue, multihash.KECCAK_256) if err != nil { return err } response.StateNodes = append(response.StateNodes, StateNode{ - StateLeafKey: stateNode.LeafKey, + StateLeafKey: common.BytesToHash(stateNode.LeafKey), Path: stateNode.Path, IPLD: ipfs.BlockModel{ - Data: stateNode.Value, + Data: stateNode.NodeValue, CID: cid.String(), }, - Type: stateNode.Type, + Type: stateNode.NodeType, }) } } if !storageFilter.Off && checkNodeKeys(storageAddressFilters, stateNode.LeafKey) { for _, storageNode := range payload.StorageNodes[common.Bytes2Hex(stateNode.Path)] { if checkNodeKeys(storageKeyFilters, storageNode.LeafKey) { - cid, err := ipld.RawdataToCid(ipld.MEthStorageTrie, storageNode.Value, multihash.KECCAK_256) + cid, err := ipld.RawdataToCid(ipld.MEthStorageTrie, storageNode.NodeValue, multihash.KECCAK_256) if err != nil { return err } response.StorageNodes = append(response.StorageNodes, StorageNode{ - StateLeafKey: stateNode.LeafKey, - StorageLeafKey: storageNode.LeafKey, + StateLeafKey: common.BytesToHash(stateNode.LeafKey), + StorageLeafKey: common.BytesToHash(storageNode.LeafKey), IPLD: ipfs.BlockModel{ - Data: storageNode.Value, + Data: storageNode.NodeValue, CID: cid.String(), }, - Type: storageNode.Type, + Type: storageNode.NodeType, Path: storageNode.Path, }) } @@ -310,13 +309,13 @@ func (s *ResponseFilterer) filterStateAndStorage(stateFilter StateFilter, storag return nil } -func checkNodeKeys(wantedKeys []common.Hash, actualKey common.Hash) bool { +func checkNodeKeys(wantedKeys []common.Hash, actualKey []byte) bool { // If we aren't filtering for any specific keys, all nodes are a go if len(wantedKeys) == 0 { return true } for _, key := range wantedKeys { - if bytes.Equal(key.Bytes(), actualKey.Bytes()) { + if bytes.Equal(key.Bytes(), actualKey) { return true } } diff --git a/pkg/eth/filterer_test.go b/pkg/eth/filterer_test.go index bcf6dd25f..cfa64a3b4 100644 --- a/pkg/eth/filterer_test.go +++ b/pkg/eth/filterer_test.go @@ -19,13 +19,12 @@ package eth_test import ( "bytes" + "github.com/ethereum/go-ethereum/statediff/indexer/ipfs" sdtypes "github.com/ethereum/go-ethereum/statediff/types" . "github.com/onsi/ginkgo" . "github.com/onsi/gomega" - "github.com/vulcanize/ipld-eth-indexer/pkg/ipfs" - "github.com/vulcanize/ipld-eth-server/pkg/eth" "github.com/vulcanize/ipld-eth-server/pkg/eth/test_helpers" "github.com/vulcanize/ipld-eth-server/pkg/shared" diff --git a/pkg/eth/ipld_fetcher.go b/pkg/eth/ipld_fetcher.go index e85921ad4..57e26d6f2 100644 --- a/pkg/eth/ipld_fetcher.go +++ b/pkg/eth/ipld_fetcher.go @@ -22,13 +22,12 @@ import ( "math/big" "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/statediff/indexer/models" + "github.com/ethereum/go-ethereum/statediff/indexer/postgres" "github.com/jmoiron/sqlx" log "github.com/sirupsen/logrus" - "github.com/vulcanize/ipld-eth-indexer/pkg/eth" - "github.com/vulcanize/ipld-eth-indexer/pkg/ipfs" - "github.com/vulcanize/ipld-eth-indexer/pkg/postgres" - + "github.com/ethereum/go-ethereum/statediff/indexer/ipfs" "github.com/vulcanize/ipld-eth-server/pkg/shared" ) @@ -104,7 +103,7 @@ func (f *IPLDFetcher) Fetch(cids CIDWrapper) (*IPLDs, error) { } // FetchHeaders fetches headers -func (f *IPLDFetcher) FetchHeader(tx *sqlx.Tx, c eth.HeaderModel) (ipfs.BlockModel, error) { +func (f *IPLDFetcher) FetchHeader(tx *sqlx.Tx, c models.HeaderModel) (ipfs.BlockModel, error) { log.Debug("fetching header ipld") headerBytes, err := shared.FetchIPLDByMhKey(tx, c.MhKey) if err != nil { @@ -117,7 +116,7 @@ func (f *IPLDFetcher) FetchHeader(tx *sqlx.Tx, c eth.HeaderModel) (ipfs.BlockMod } // FetchUncles fetches uncles -func (f *IPLDFetcher) FetchUncles(tx *sqlx.Tx, cids []eth.UncleModel) ([]ipfs.BlockModel, error) { +func (f *IPLDFetcher) FetchUncles(tx *sqlx.Tx, cids []models.UncleModel) ([]ipfs.BlockModel, error) { log.Debug("fetching uncle iplds") uncleIPLDs := make([]ipfs.BlockModel, len(cids)) for i, c := range cids { @@ -134,7 +133,7 @@ func (f *IPLDFetcher) FetchUncles(tx *sqlx.Tx, cids []eth.UncleModel) ([]ipfs.Bl } // FetchTrxs fetches transactions -func (f *IPLDFetcher) FetchTrxs(tx *sqlx.Tx, cids []eth.TxModel) ([]ipfs.BlockModel, error) { +func (f *IPLDFetcher) FetchTrxs(tx *sqlx.Tx, cids []models.TxModel) ([]ipfs.BlockModel, error) { log.Debug("fetching transaction iplds") trxIPLDs := make([]ipfs.BlockModel, len(cids)) for i, c := range cids { @@ -151,7 +150,7 @@ func (f *IPLDFetcher) FetchTrxs(tx *sqlx.Tx, cids []eth.TxModel) ([]ipfs.BlockMo } // FetchRcts fetches receipts -func (f *IPLDFetcher) FetchRcts(tx *sqlx.Tx, cids []eth.ReceiptModel) ([]ipfs.BlockModel, error) { +func (f *IPLDFetcher) FetchRcts(tx *sqlx.Tx, cids []models.ReceiptModel) ([]ipfs.BlockModel, error) { log.Debug("fetching receipt iplds") rctIPLDs := make([]ipfs.BlockModel, len(cids)) for i, c := range cids { @@ -168,7 +167,7 @@ func (f *IPLDFetcher) FetchRcts(tx *sqlx.Tx, cids []eth.ReceiptModel) ([]ipfs.Bl } // FetchState fetches state nodes -func (f *IPLDFetcher) FetchState(tx *sqlx.Tx, cids []eth.StateNodeModel) ([]StateNode, error) { +func (f *IPLDFetcher) FetchState(tx *sqlx.Tx, cids []models.StateNodeModel) ([]StateNode, error) { log.Debug("fetching state iplds") stateNodes := make([]StateNode, 0, len(cids)) for _, stateNode := range cids { @@ -193,7 +192,7 @@ func (f *IPLDFetcher) FetchState(tx *sqlx.Tx, cids []eth.StateNodeModel) ([]Stat } // FetchStorage fetches storage nodes -func (f *IPLDFetcher) FetchStorage(tx *sqlx.Tx, cids []eth.StorageNodeWithStateKeyModel) ([]StorageNode, error) { +func (f *IPLDFetcher) FetchStorage(tx *sqlx.Tx, cids []models.StorageNodeWithStateKeyModel) ([]StorageNode, error) { log.Debug("fetching storage iplds") storageNodes := make([]StorageNode, 0, len(cids)) for _, storageNode := range cids { diff --git a/pkg/eth/ipld_fetcher_test.go b/pkg/eth/ipld_fetcher_test.go index b12b43761..66dcc17a9 100644 --- a/pkg/eth/ipld_fetcher_test.go +++ b/pkg/eth/ipld_fetcher_test.go @@ -17,12 +17,11 @@ package eth_test import ( + "github.com/ethereum/go-ethereum/params" + "github.com/ethereum/go-ethereum/statediff/indexer" + "github.com/ethereum/go-ethereum/statediff/indexer/postgres" . "github.com/onsi/ginkgo" . "github.com/onsi/gomega" - "github.com/vulcanize/ipld-eth-indexer/pkg/shared" - - eth2 "github.com/vulcanize/ipld-eth-indexer/pkg/eth" - "github.com/vulcanize/ipld-eth-indexer/pkg/postgres" "github.com/vulcanize/ipld-eth-server/pkg/eth" "github.com/vulcanize/ipld-eth-server/pkg/eth/test_helpers" @@ -31,18 +30,28 @@ import ( var _ = Describe("IPLDFetcher", func() { var ( db *postgres.DB - pubAndIndexer *eth2.IPLDPublisher + pubAndIndexer *indexer.StateDiffIndexer fetcher *eth.IPLDFetcher ) Describe("Fetch", func() { BeforeEach(func() { - var err error - db, err = shared.SetupDB() + var ( + err error + tx *indexer.BlockTx + ) + db, err = SetupDB() Expect(err).ToNot(HaveOccurred()) - pubAndIndexer = eth2.NewIPLDPublisher(db) - err = pubAndIndexer.Publish(test_helpers.MockConvertedPayload) + pubAndIndexer = indexer.NewStateDiffIndexer(params.TestChainConfig, db) + tx, err = pubAndIndexer.PushBlock(test_helpers.MockBlock, test_helpers.MockReceipts, test_helpers.MockBlock.Difficulty()) + for _, node := range test_helpers.MockStateNodes { + err = pubAndIndexer.PushStateNode(tx, node) + Expect(err).ToNot(HaveOccurred()) + } + + err = tx.Close(err) Expect(err).ToNot(HaveOccurred()) fetcher = eth.NewIPLDFetcher(db) + }) AfterEach(func() { eth.TearDownDB(db) diff --git a/pkg/eth/ipld_retriever.go b/pkg/eth/ipld_retriever.go index 7442756d8..0db3c1ff8 100644 --- a/pkg/eth/ipld_retriever.go +++ b/pkg/eth/ipld_retriever.go @@ -22,9 +22,8 @@ import ( "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/crypto" "github.com/ethereum/go-ethereum/rlp" + "github.com/ethereum/go-ethereum/statediff/indexer/postgres" "github.com/lib/pq" - - "github.com/vulcanize/ipld-eth-indexer/pkg/postgres" ) const ( diff --git a/pkg/eth/test_helpers.go b/pkg/eth/test_helpers.go index c39ec5c3f..97c10d05b 100644 --- a/pkg/eth/test_helpers.go +++ b/pkg/eth/test_helpers.go @@ -17,10 +17,9 @@ package eth import ( + "github.com/ethereum/go-ethereum/statediff/indexer/models" + "github.com/ethereum/go-ethereum/statediff/indexer/postgres" . "github.com/onsi/gomega" - - "github.com/vulcanize/ipld-eth-indexer/pkg/eth" - "github.com/vulcanize/ipld-eth-indexer/pkg/postgres" ) // TearDownDB is used to tear down the watcher dbs after tests @@ -46,7 +45,7 @@ func TearDownDB(db *postgres.DB) { } // TxModelsContainsCID used to check if a list of TxModels contains a specific cid string -func TxModelsContainsCID(txs []eth.TxModel, cid string) bool { +func TxModelsContainsCID(txs []models.TxModel, cid string) bool { for _, tx := range txs { if tx.CID == cid { return true @@ -56,7 +55,7 @@ func TxModelsContainsCID(txs []eth.TxModel, cid string) bool { } // ListContainsBytes used to check if a list of byte arrays contains a particular byte array -func ReceiptModelsContainsCID(rcts []eth.ReceiptModel, cid string) bool { +func ReceiptModelsContainsCID(rcts []models.ReceiptModel, cid string) bool { for _, rct := range rcts { if rct.CID == cid { return true diff --git a/pkg/eth/test_helpers/test_data.go b/pkg/eth/test_helpers/test_data.go index 4c88e796b..167cc2048 100644 --- a/pkg/eth/test_helpers/test_data.go +++ b/pkg/eth/test_helpers/test_data.go @@ -23,11 +23,14 @@ import ( "crypto/rand" "math/big" + "github.com/ethereum/go-ethereum/statediff/indexer" + "github.com/ethereum/go-ethereum/statediff/indexer/ipfs" + "github.com/ethereum/go-ethereum/statediff/indexer/ipfs/ipld" + "github.com/ethereum/go-ethereum/statediff/indexer/models" + "github.com/ethereum/go-ethereum/statediff/indexer/shared" sdtypes "github.com/ethereum/go-ethereum/statediff/types" "github.com/ethereum/go-ethereum/trie" - "github.com/vulcanize/ipld-eth-indexer/pkg/shared" - "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core/state" "github.com/ethereum/go-ethereum/core/types" @@ -39,11 +42,7 @@ import ( "github.com/multiformats/go-multihash" log "github.com/sirupsen/logrus" - "github.com/vulcanize/ipld-eth-indexer/pkg/eth" - "github.com/vulcanize/ipld-eth-indexer/pkg/ipfs" - "github.com/vulcanize/ipld-eth-indexer/pkg/ipfs/ipld" - - eth2 "github.com/vulcanize/ipld-eth-server/pkg/eth" + "github.com/vulcanize/ipld-eth-server/pkg/eth" ) // Test variables @@ -149,7 +148,7 @@ var ( State2MhKey = shared.MultihashKeyFromCID(State2CID) StorageCID, _ = ipld.RawdataToCid(ipld.MEthStorageTrie, StorageLeafNode, multihash.KECCAK_256) StorageMhKey = shared.MultihashKeyFromCID(StorageCID) - MockTrxMeta = []eth.TxModel{ + MockTrxMeta = []models.TxModel{ { CID: "", // This is empty until we go to publish to ipfs MhKey: "", @@ -178,7 +177,7 @@ var ( Data: MockContractByteCode, }, } - MockTrxMetaPostPublsh = []eth.TxModel{ + MockTrxMetaPostPublsh = []models.TxModel{ { CID: Trx1CID.String(), // This is empty until we go to publish to ipfs MhKey: Trx1MhKey, @@ -207,7 +206,7 @@ var ( Data: MockContractByteCode, }, } - MockRctMeta = []eth.ReceiptModel{ + MockRctMeta = []models.ReceiptModel{ { CID: "", MhKey: "", @@ -246,7 +245,7 @@ var ( LogContracts: []string{}, }, } - MockRctMetaPostPublish = []eth.ReceiptModel{ + MockRctMetaPostPublish = []models.ReceiptModel{ { CID: Rct1CID.String(), MhKey: Rct1MhKey, @@ -331,21 +330,30 @@ var ( Account, }) - MockStateNodes = []eth.TrieNode{ + MockStateNodes = []sdtypes.StateNode{ { - LeafKey: common.BytesToHash(ContractLeafKey), - Path: []byte{'\x06'}, - Value: ContractLeafNode, - Type: sdtypes.Leaf, + LeafKey: ContractLeafKey, + Path: []byte{'\x06'}, + NodeValue: ContractLeafNode, + NodeType: sdtypes.Leaf, + StorageNodes: []sdtypes.StorageNode{ + { + Path: []byte{}, + NodeType: sdtypes.Leaf, + LeafKey: StorageLeafKey, + NodeValue: StorageLeafNode, + }, + }, }, { - LeafKey: common.BytesToHash(AccountLeafKey), - Path: []byte{'\x0c'}, - Value: AccountLeafNode, - Type: sdtypes.Leaf, + LeafKey: AccountLeafKey, + Path: []byte{'\x0c'}, + NodeValue: AccountLeafNode, + NodeType: sdtypes.Leaf, + StorageNodes: []sdtypes.StorageNode{}, }, } - MockStateMetaPostPublish = []eth.StateNodeModel{ + MockStateMetaPostPublish = []models.StateNodeModel{ { CID: State1CID.String(), MhKey: State1MhKey, @@ -361,13 +369,13 @@ var ( StateKey: common.BytesToHash(AccountLeafKey).Hex(), }, } - MockStorageNodes = map[string][]eth.TrieNode{ + MockStorageNodes = map[string][]sdtypes.StorageNode{ contractPath: { { - LeafKey: common.BytesToHash(StorageLeafKey), - Value: StorageLeafNode, - Type: sdtypes.Leaf, - Path: []byte{}, + LeafKey: StorageLeafKey, + NodeValue: StorageLeafNode, + NodeType: sdtypes.Leaf, + Path: []byte{}, }, }, } @@ -391,11 +399,11 @@ var ( StateNodes: MockStateNodes, } - Reward = eth.CalcEthBlockReward(MockBlock.Header(), MockBlock.Uncles(), MockBlock.Transactions(), MockReceipts) + Reward = indexer.CalcEthBlockReward(MockBlock.Header(), MockBlock.Uncles(), MockBlock.Transactions(), MockReceipts) - MockCIDWrapper = ð2.CIDWrapper{ + MockCIDWrapper = ð.CIDWrapper{ BlockNumber: new(big.Int).Set(BlockNumber), - Header: eth.HeaderModel{ + Header: models.HeaderModel{ BlockNumber: "1", BlockHash: MockBlock.Hash().String(), ParentHash: "0x0000000000000000000000000000000000000000000000000000000000000000", @@ -413,9 +421,9 @@ var ( }, Transactions: MockTrxMetaPostPublsh, Receipts: MockRctMetaPostPublish, - Uncles: []eth.UncleModel{}, + Uncles: []models.UncleModel{}, StateNodes: MockStateMetaPostPublish, - StorageNodes: []eth.StorageNodeWithStateKeyModel{ + StorageNodes: []models.StorageNodeWithStateKeyModel{ { Path: []byte{}, CID: StorageCID.String(), @@ -438,7 +446,7 @@ var ( State2IPLD, _ = blocks.NewBlockWithCid(AccountLeafNode, State2CID) StorageIPLD, _ = blocks.NewBlockWithCid(StorageLeafNode, StorageCID) - MockIPLDs = eth2.IPLDs{ + MockIPLDs = eth.IPLDs{ BlockNumber: new(big.Int).Set(BlockNumber), Header: ipfs.BlockModel{ Data: HeaderIPLD.RawData(), @@ -472,7 +480,7 @@ var ( CID: Rct3IPLD.Cid().String(), }, }, - StateNodes: []eth2.StateNode{ + StateNodes: []eth.StateNode{ { StateLeafKey: common.BytesToHash(ContractLeafKey), Type: sdtypes.Leaf, @@ -492,7 +500,7 @@ var ( Path: []byte{'\x0c'}, }, }, - StorageNodes: []eth2.StorageNode{ + StorageNodes: []eth.StorageNode{ { StateLeafKey: common.BytesToHash(ContractLeafKey), StorageLeafKey: common.BytesToHash(StorageLeafKey), @@ -518,7 +526,7 @@ var ( MockLondonTransactions, MockLondonReceipts, SenderAdd = createDynamicTransactionsAndReceipts(LondonBlockNum) MockLondonBlock = createNewBlock(&MockLondonHeader, MockLondonTransactions, nil, MockLondonReceipts, new(trie.Trie)) - MockLondonTrxMeta = []eth.TxModel{ + MockLondonTrxMeta = []models.TxModel{ { CID: "", // This is empty until we go to publish to ipfs MhKey: "", @@ -529,7 +537,7 @@ var ( Data: []byte{}, }, } - MockLondonRctMeta = []eth.ReceiptModel{ + MockLondonRctMeta = []models.ReceiptModel{ { CID: "", MhKey: "", @@ -605,7 +613,7 @@ func createDynamicTransactionsAndReceipts(blockNumber *big.Int) (types.Transacti // TODO: Change the receipt type to DynamicFeeTxType once this PR is merged. // https://github.com/ethereum/go-ethereum/pull/22806 mockReceipt1 := &types.Receipt{ - Type: types.AccessListTxType, + Type: types.DynamicFeeTxType, PostState: common.HexToHash("0x1").Bytes(), Status: types.ReceiptStatusSuccessful, CumulativeGasUsed: 50, diff --git a/pkg/eth/types.go b/pkg/eth/types.go index ce5b256b9..1541c913a 100644 --- a/pkg/eth/types.go +++ b/pkg/eth/types.go @@ -22,9 +22,10 @@ import ( "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common/hexutil" "github.com/ethereum/go-ethereum/core/types" + + "github.com/ethereum/go-ethereum/statediff/indexer/ipfs" + "github.com/ethereum/go-ethereum/statediff/indexer/models" sdtypes "github.com/ethereum/go-ethereum/statediff/types" - "github.com/vulcanize/ipld-eth-indexer/pkg/eth" - "github.com/vulcanize/ipld-eth-indexer/pkg/ipfs" ) // RPCTransaction represents a transaction that will serialize to the RPC representation of a transaction @@ -146,10 +147,23 @@ type StorageNode struct { // Passed to IPLDFetcher type CIDWrapper struct { BlockNumber *big.Int - Header eth.HeaderModel - Uncles []eth.UncleModel - Transactions []eth.TxModel - Receipts []eth.ReceiptModel - StateNodes []eth.StateNodeModel - StorageNodes []eth.StorageNodeWithStateKeyModel + Header models.HeaderModel + Uncles []models.UncleModel + Transactions []models.TxModel + Receipts []models.ReceiptModel + StateNodes []models.StateNodeModel + StorageNodes []models.StorageNodeWithStateKeyModel +} + +// ConvertedPayload is a custom type which packages raw ETH data for publishing to IPFS and filtering to subscribers +// Returned by PayloadConverter +// Passed to IPLDPublisher and ResponseFilterer +type ConvertedPayload struct { + TotalDifficulty *big.Int + Block *types.Block + TxMetaData []models.TxModel + Receipts types.Receipts + ReceiptMetaData []models.ReceiptModel + StateNodes []sdtypes.StateNode + StorageNodes map[string][]sdtypes.StorageNode } diff --git a/pkg/graphql/graphql_test.go b/pkg/graphql/graphql_test.go index c16651548..35d1018bf 100644 --- a/pkg/graphql/graphql_test.go +++ b/pkg/graphql/graphql_test.go @@ -28,20 +28,32 @@ import ( "github.com/ethereum/go-ethereum/core/vm" "github.com/ethereum/go-ethereum/crypto" "github.com/ethereum/go-ethereum/params" - "github.com/ethereum/go-ethereum/rlp" "github.com/ethereum/go-ethereum/rpc" "github.com/ethereum/go-ethereum/statediff" + "github.com/ethereum/go-ethereum/statediff/indexer" + "github.com/ethereum/go-ethereum/statediff/indexer/node" + "github.com/ethereum/go-ethereum/statediff/indexer/postgres" + sdtypes "github.com/ethereum/go-ethereum/statediff/types" . "github.com/onsi/ginkgo" . "github.com/onsi/gomega" - eth2 "github.com/vulcanize/ipld-eth-indexer/pkg/eth" - "github.com/vulcanize/ipld-eth-indexer/pkg/postgres" - "github.com/vulcanize/ipld-eth-indexer/pkg/shared" "github.com/vulcanize/ipld-eth-server/pkg/eth" "github.com/vulcanize/ipld-eth-server/pkg/eth/test_helpers" "github.com/vulcanize/ipld-eth-server/pkg/graphql" ) +// SetupDB is use to setup a db for watcher tests +func SetupDB() (*postgres.DB, error) { + uri := postgres.DbConnectionString(postgres.ConnectionParams{ + User: "vdbm", + Password: "password", + Hostname: "localhost", + Name: "vulcanize_testing", + Port: 8077, + }) + return postgres.NewDB(uri, postgres.ConnectionConfig{}, node.Info{}) +} + var _ = Describe("GraphQL", func() { const ( gqlEndPoint = "127.0.0.1:8083" @@ -66,9 +78,9 @@ var _ = Describe("GraphQL", func() { It("test init", func() { var err error - db, err = shared.SetupDB() + db, err = SetupDB() Expect(err).ToNot(HaveOccurred()) - transformer := eth2.NewStateDiffTransformer(chainConfig, db) + transformer := indexer.NewStateDiffIndexer(chainConfig, db) backend, err = eth.NewEthBackend(db, ð.Config{ ChainConfig: chainConfig, VmConfig: vm.Config{}, @@ -109,35 +121,43 @@ var _ = Describe("GraphQL", func() { var diff statediff.StateObject diff, err = builder.BuildStateDiffObject(args, params) Expect(err).ToNot(HaveOccurred()) - diffRlp, err := rlp.EncodeToBytes(diff) - Expect(err).ToNot(HaveOccurred()) - blockRlp, err := rlp.EncodeToBytes(block) - Expect(err).ToNot(HaveOccurred()) - receiptsRlp, err := rlp.EncodeToBytes(rcts) + + tx, err := transformer.PushBlock(block, rcts, mockTD) Expect(err).ToNot(HaveOccurred()) - payload := statediff.Payload{ - StateObjectRlp: diffRlp, - BlockRlp: blockRlp, - ReceiptsRlp: receiptsRlp, - TotalDifficulty: mockTD, + + for _, node := range diff.Nodes { + err = transformer.PushStateNode(tx, node) + Expect(err).ToNot(HaveOccurred()) } - _, err = transformer.Transform(0, payload) + err = tx.Close(err) Expect(err).ToNot(HaveOccurred()) } // Insert some non-canonical data into the database so that we test our ability to discern canonicity - indexAndPublisher := eth2.NewIPLDPublisher(db) + indexAndPublisher := indexer.NewStateDiffIndexer(chainConfig, db) blockHash = test_helpers.MockBlock.Hash() contractAddress = test_helpers.ContractAddr - err = indexAndPublisher.Publish(test_helpers.MockConvertedPayload) + tx, err := indexAndPublisher.PushBlock(test_helpers.MockBlock, test_helpers.MockReceipts, test_helpers.MockBlock.Difficulty()) + Expect(err).ToNot(HaveOccurred()) + + err = tx.Close(err) Expect(err).ToNot(HaveOccurred()) // The non-canonical header has a child - err = indexAndPublisher.Publish(test_helpers.MockConvertedPayloadForChild) + tx, err = indexAndPublisher.PushBlock(test_helpers.MockChild, test_helpers.MockReceipts, test_helpers.MockChild.Difficulty()) Expect(err).ToNot(HaveOccurred()) - err = publishCode(db, test_helpers.ContractCodeHash, test_helpers.ContractCode) + + ccHash := sdtypes.CodeAndCodeHash{ + Hash: test_helpers.CodeHash, + Code: test_helpers.ContractCode, + } + + err = indexAndPublisher.PushCodeAndCodeHash(tx, ccHash) + Expect(err).ToNot(HaveOccurred()) + + err = tx.Close(err) Expect(err).ToNot(HaveOccurred()) graphQLServer, err = graphql.New(backend, gqlEndPoint, nil, []string{"*"}, rpc.HTTPTimeouts{}) @@ -208,23 +228,3 @@ var _ = Describe("GraphQL", func() { }) }) }) - -func publishCode(db *postgres.DB, codeHash common.Hash, code []byte) error { - tx, err := db.Beginx() - if err != nil { - return err - } - - mhKey, err := shared.MultihashKeyFromKeccak256(codeHash) - if err != nil { - _ = tx.Rollback() - return err - } - - if err := shared.PublishDirect(tx, mhKey, code); err != nil { - _ = tx.Rollback() - return err - } - - return tx.Commit() -} diff --git a/pkg/serve/api.go b/pkg/serve/api.go index cbf0abaa7..e00877c41 100644 --- a/pkg/serve/api.go +++ b/pkg/serve/api.go @@ -20,10 +20,9 @@ import ( "context" "github.com/ethereum/go-ethereum/rpc" + "github.com/ethereum/go-ethereum/statediff/indexer/shared" log "github.com/sirupsen/logrus" - "github.com/vulcanize/ipld-eth-indexer/pkg/shared" - "github.com/vulcanize/ipld-eth-server/pkg/eth" ) diff --git a/pkg/serve/config.go b/pkg/serve/config.go index 772bb8346..64e984842 100644 --- a/pkg/serve/config.go +++ b/pkg/serve/config.go @@ -23,15 +23,11 @@ import ( "os" "path/filepath" - "github.com/ethereum/go-ethereum/rpc" - "github.com/vulcanize/ipld-eth-indexer/pkg/shared" - "github.com/ethereum/go-ethereum/common" - "github.com/ethereum/go-ethereum/params" + "github.com/ethereum/go-ethereum/rpc" + "github.com/ethereum/go-ethereum/statediff/indexer/postgres" "github.com/spf13/viper" - "github.com/vulcanize/ipld-eth-indexer/pkg/postgres" - "github.com/vulcanize/ipld-eth-indexer/utils" "github.com/vulcanize/ipld-eth-server/pkg/prom" "github.com/vulcanize/ipld-eth-server/pkg/eth" @@ -56,7 +52,8 @@ const ( // Config struct type Config struct { DB *postgres.DB - DBConfig postgres.Config + DBConfig postgres.ConnectionConfig + DBParams postgres.ConnectionParams WSEnabled bool WSEndpoint string @@ -89,17 +86,17 @@ type Config struct { func NewConfig() (*Config, error) { c := new(Config) - viper.BindEnv("ethereum.httpPath", shared.ETH_HTTP_PATH) + viper.BindEnv("ethereum.httpPath", ETH_HTTP_PATH) viper.BindEnv("ethereum.defaultSender", ETH_DEFAULT_SENDER_ADDR) viper.BindEnv("ethereum.rpcGasCap", ETH_RPC_GAS_CAP) viper.BindEnv("ethereum.chainConfig", ETH_CHAIN_CONFIG) viper.BindEnv("ethereum.supportsStateDiff", ETH_SUPPORTS_STATEDIFF) - c.DBConfig.Init() + //c.DBConfig.Init() ethHTTP := viper.GetString("ethereum.httpPath") ethHTTPEndpoint := fmt.Sprintf("http://%s", ethHTTP) - nodeInfo, cli, err := shared.GetEthNodeAndClient(ethHTTPEndpoint) + nodeInfo, cli, err := getEthNodeAndClient(ethHTTPEndpoint) if err != nil { return nil, err } @@ -184,9 +181,9 @@ func NewConfig() (*Config, error) { c.IpldGraphqlEnabled = ipldGraphqlEnabled overrideDBConnConfig(&c.DBConfig) - serveDB := utils.LoadPostgres(c.DBConfig, nodeInfo, false) - prom.RegisterDBCollector(c.DBConfig.Name, serveDB.DB) - c.DB = &serveDB + serveDB, err := postgres.NewDB(postgres.DbConnectionString(c.DBParams), c.DBConfig, nodeInfo) + prom.RegisterDBCollector(c.DBParams.Name, serveDB.DB) + c.DB = serveDB defaultSenderStr := viper.GetString("ethereum.defaultSender") if defaultSenderStr != "" { @@ -208,7 +205,7 @@ func NewConfig() (*Config, error) { return c, err } -func overrideDBConnConfig(con *postgres.Config) { +func overrideDBConnConfig(con *postgres.ConnectionConfig) { viper.BindEnv("database.server.maxIdle", SERVER_MAX_IDLE_CONNECTIONS) viper.BindEnv("database.server.maxOpen", SERVER_MAX_OPEN_CONNECTIONS) viper.BindEnv("database.server.maxLifetime", SERVER_MAX_CONN_LIFETIME) diff --git a/pkg/serve/env.go b/pkg/serve/env.go new file mode 100644 index 000000000..1bd7b5142 --- /dev/null +++ b/pkg/serve/env.go @@ -0,0 +1,41 @@ +package serve + +import ( + "github.com/ethereum/go-ethereum/rpc" + "github.com/ethereum/go-ethereum/statediff/indexer/node" + "github.com/spf13/viper" +) + +// Env variables +const ( + HTTP_TIMEOUT = "HTTP_TIMEOUT" + + ETH_WS_PATH = "ETH_WS_PATH" + ETH_HTTP_PATH = "ETH_HTTP_PATH" + ETH_NODE_ID = "ETH_NODE_ID" + ETH_CLIENT_NAME = "ETH_CLIENT_NAME" + ETH_GENESIS_BLOCK = "ETH_GENESIS_BLOCK" + ETH_NETWORK_ID = "ETH_NETWORK_ID" + ETH_CHAIN_ID = "ETH_CHAIN_ID" +) + +// GetEthNodeAndClient returns eth node info and client from path url +func getEthNodeAndClient(path string) (node.Info, *rpc.Client, error) { + viper.BindEnv("ethereum.nodeID", ETH_NODE_ID) + viper.BindEnv("ethereum.clientName", ETH_CLIENT_NAME) + viper.BindEnv("ethereum.genesisBlock", ETH_GENESIS_BLOCK) + viper.BindEnv("ethereum.networkID", ETH_NETWORK_ID) + viper.BindEnv("ethereum.chainID", ETH_CHAIN_ID) + + rpcClient, err := rpc.Dial(path) + if err != nil { + return node.Info{}, nil, err + } + return node.Info{ + ID: viper.GetString("ethereum.nodeID"), + ClientName: viper.GetString("ethereum.clientName"), + GenesisBlock: viper.GetString("ethereum.genesisBlock"), + NetworkID: viper.GetString("ethereum.networkID"), + ChainID: viper.GetUint64("ethereum.chainID"), + }, rpcClient, nil +} diff --git a/pkg/serve/service.go b/pkg/serve/service.go index c5f30b477..f6e01b6c1 100644 --- a/pkg/serve/service.go +++ b/pkg/serve/service.go @@ -21,6 +21,7 @@ import ( "strconv" "sync" + "github.com/ethereum/go-ethereum/statediff/indexer/postgres" "github.com/vulcanize/ipld-eth-server/pkg/net" "github.com/ethereum/go-ethereum/core/vm" @@ -32,10 +33,6 @@ import ( "github.com/ethereum/go-ethereum/rlp" "github.com/ethereum/go-ethereum/rpc" log "github.com/sirupsen/logrus" - - eth2 "github.com/vulcanize/ipld-eth-indexer/pkg/eth" - "github.com/vulcanize/ipld-eth-indexer/pkg/postgres" - "github.com/vulcanize/ipld-eth-server/pkg/eth" ) @@ -52,7 +49,7 @@ type Server interface { APIs() []rpc.API Protocols() []p2p.Protocol // Pub-Sub handling event loop - Serve(wg *sync.WaitGroup, screenAndServePayload <-chan eth2.ConvertedPayload) + Serve(wg *sync.WaitGroup, screenAndServePayload <-chan eth.ConvertedPayload) // Method to subscribe to the service Subscribe(id rpc.ID, sub chan<- SubscriptionPayload, quitChan chan<- bool, params eth.SubscriptionSettings) // Method to unsubscribe from the service @@ -145,7 +142,7 @@ func (sap *Service) APIs() []rpc.API { // It filters and sends this data to any subscribers to the service // This process can also be stood up alone, without an screenAndServePayload attached to a Sync process // and it will hang on the WaitGroup indefinitely, allowing the Service to serve historical data requests only -func (sap *Service) Serve(wg *sync.WaitGroup, screenAndServePayload <-chan eth2.ConvertedPayload) { +func (sap *Service) Serve(wg *sync.WaitGroup, screenAndServePayload <-chan eth.ConvertedPayload) { sap.serveWg = wg go func() { wg.Add(1) @@ -164,7 +161,7 @@ func (sap *Service) Serve(wg *sync.WaitGroup, screenAndServePayload <-chan eth2. } // filterAndServe filters the payload according to each subscription type and sends to the subscriptions -func (sap *Service) filterAndServe(payload eth2.ConvertedPayload) { +func (sap *Service) filterAndServe(payload eth.ConvertedPayload) { log.Debug("sending eth ipld payload to subscriptions") sap.Lock() sap.serveWg.Add(1) @@ -337,7 +334,7 @@ func (sap *Service) Unsubscribe(id rpc.ID) { func (sap *Service) Start() error { log.Info("starting eth ipld server") wg := new(sync.WaitGroup) - payloadChan := make(chan eth2.ConvertedPayload, PayloadChanBufferSize) + payloadChan := make(chan eth.ConvertedPayload, PayloadChanBufferSize) sap.Serve(wg, payloadChan) return nil } diff --git a/pkg/shared/functions.go b/pkg/shared/functions.go index 3a51e53a6..aa9c4bd96 100644 --- a/pkg/shared/functions.go +++ b/pkg/shared/functions.go @@ -18,13 +18,13 @@ package shared import ( "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/statediff/indexer/ipfs/ipld" "github.com/ipfs/go-cid" "github.com/ipfs/go-ipfs-blockstore" "github.com/ipfs/go-ipfs-ds-help" node "github.com/ipfs/go-ipld-format" "github.com/jmoiron/sqlx" "github.com/sirupsen/logrus" - "github.com/vulcanize/ipld-eth-indexer/pkg/ipfs/ipld" ) // HandleZeroAddrPointer will return an emtpy string for a nil address pointer diff --git a/pkg/shared/test_helpers.go b/pkg/shared/test_helpers.go index 1b96eeac0..35b7adfa9 100644 --- a/pkg/shared/test_helpers.go +++ b/pkg/shared/test_helpers.go @@ -19,7 +19,7 @@ package shared import ( "bytes" - "github.com/vulcanize/ipld-eth-indexer/pkg/ipfs" + "github.com/ethereum/go-ethereum/statediff/indexer/ipfs" ) // IPLDsContainBytes used to check if a list of strings contains a particular string diff --git a/test_config/test_config.go b/test_config/test_config.go index a2bb2b3b9..9f48aded2 100644 --- a/test_config/test_config.go +++ b/test_config/test_config.go @@ -19,13 +19,12 @@ package test_config import ( "errors" + "github.com/ethereum/go-ethereum/statediff/indexer/postgres" "github.com/sirupsen/logrus" "github.com/spf13/viper" - - "github.com/vulcanize/ipld-eth-indexer/pkg/postgres" ) -var DBConfig postgres.Config +var DBConfig postgres.ConnectionParams func init() { setTestConfig() @@ -53,7 +52,7 @@ func setTestConfig() { port := vip.GetInt("database.port") name := vip.GetString("database.name") - DBConfig = postgres.Config{ + DBConfig = postgres.ConnectionParams{ Hostname: hn, Name: name, Port: port,