From f2bff6f21319d6e337e787b42253d7b323c46bcd Mon Sep 17 00:00:00 2001 From: Aryan Tikarya Date: Fri, 4 Oct 2024 11:25:14 +0530 Subject: [PATCH] refactor: use epochs to gc eth tx hashes from chain indexer (#12516) * refactor: use epochs to gc eth tx hashes from chain indexer * test: update gc test * fix: linter issue * refactor: gc test * add comments explaining calculation for epoch duration * address comments --- chain/index/ddls.go | 2 +- chain/index/ddls_test.go | 44 ------- chain/index/gc.go | 32 +++-- chain/index/gc_test.go | 241 +++++++++++++++++++++++++-------- chain/index/helper_test.go | 264 +++++++++++++++++++++++++++++++++++++ chain/index/indexer.go | 2 +- chain/index/read_test.go | 99 -------------- go.mod | 1 + 8 files changed, 476 insertions(+), 209 deletions(-) create mode 100644 chain/index/helper_test.go diff --git a/chain/index/ddls.go b/chain/index/ddls.go index 5acfb8b87be..de298bf8d4a 100644 --- a/chain/index/ddls.go +++ b/chain/index/ddls.go @@ -67,7 +67,7 @@ func preparedStatementMapping(ps *preparedStatements) map[**sql.Stmt]string { &ps.updateTipsetToNonRevertedStmt: "UPDATE tipset_message SET reverted = 0 WHERE tipset_key_cid = ?", &ps.updateTipsetToRevertedStmt: "UPDATE tipset_message SET reverted = 1 WHERE tipset_key_cid = ?", &ps.removeTipsetsBeforeHeightStmt: "DELETE FROM tipset_message WHERE height < ?", - &ps.removeEthHashesOlderThanStmt: "DELETE FROM eth_tx_hash WHERE inserted_at < datetime('now', ?)", + &ps.removeEthHashesBeforeTimeStmt: "DELETE FROM eth_tx_hash WHERE inserted_at < ?", &ps.updateTipsetsToRevertedFromHeightStmt: "UPDATE tipset_message SET reverted = 1 WHERE height >= ?", &ps.updateEventsToRevertedFromHeightStmt: "UPDATE event SET reverted = 1 WHERE message_id IN (SELECT message_id FROM tipset_message WHERE height >= ?)", &ps.isIndexEmptyStmt: "SELECT NOT EXISTS(SELECT 1 FROM tipset_message LIMIT 1)", diff --git a/chain/index/ddls_test.go b/chain/index/ddls_test.go index 69b09f975c8..45ee5dd66b8 100644 --- a/chain/index/ddls_test.go +++ b/chain/index/ddls_test.go @@ -662,47 +662,3 @@ func verifyNonRevertedEventEntriesCount(t *testing.T, s *SqliteIndexer, tipsetKe require.NoError(t, err) require.Equal(t, expectedCount, count) } - -func insertTipsetMessage(t *testing.T, s *SqliteIndexer, ts tipsetMessage) int64 { - res, err := s.stmts.insertTipsetMessageStmt.Exec(ts.tipsetKeyCid, ts.height, ts.reverted, ts.messageCid, ts.messageIndex) - require.NoError(t, err) - - rowsAffected, err := res.RowsAffected() - require.NoError(t, err) - require.Equal(t, int64(1), rowsAffected) - - messageID, err := res.LastInsertId() - require.NoError(t, err) - require.NotEqual(t, int64(0), messageID) - - // read back the message to verify it was inserted correctly - verifyTipsetMessage(t, s, messageID, ts) - - return messageID -} - -func insertEvent(t *testing.T, s *SqliteIndexer, e event) int64 { - res, err := s.stmts.insertEventStmt.Exec(e.messageID, e.eventIndex, e.emitterAddr, e.reverted) - require.NoError(t, err) - - rowsAffected, err := res.RowsAffected() - require.NoError(t, err) - require.Equal(t, int64(1), rowsAffected) - - eventID, err := res.LastInsertId() - require.NoError(t, err) - require.NotEqual(t, int64(0), eventID) - - verifyEvent(t, s, eventID, e) - - return eventID -} - -func insertEventEntry(t *testing.T, s *SqliteIndexer, ee eventEntry) { - res, err := s.stmts.insertEventEntryStmt.Exec(ee.eventID, ee.indexed, ee.flags, ee.key, ee.codec, ee.value) - require.NoError(t, err) - - rowsAffected, err := res.RowsAffected() - require.NoError(t, err) - require.Equal(t, int64(1), rowsAffected) -} diff --git a/chain/index/gc.go b/chain/index/gc.go index 727944e3142..320951b22f3 100644 --- a/chain/index/gc.go +++ b/chain/index/gc.go @@ -2,7 +2,6 @@ package index import ( "context" - "strconv" "time" logging "github.com/ipfs/go-log/v2" @@ -15,6 +14,8 @@ var ( cleanupInterval = time.Duration(4) * time.Hour ) +const graceEpochs = 10 + func (si *SqliteIndexer) gcLoop() { defer si.wg.Done() @@ -50,7 +51,7 @@ func (si *SqliteIndexer) gc(ctx context.Context) { head := si.cs.GetHeaviestTipSet() - removalEpoch := int64(head.Height()) - si.gcRetentionEpochs - 10 // 10 is for some grace period + removalEpoch := int64(head.Height()) - si.gcRetentionEpochs - graceEpochs if removalEpoch <= 0 { log.Info("no tipsets to gc") return @@ -75,17 +76,28 @@ func (si *SqliteIndexer) gc(ctx context.Context) { // ------------------------------------------------------------------------------------------------- // Also GC eth hashes - // Convert gcRetentionEpochs to number of days - gcRetentionDays := si.gcRetentionEpochs / (builtin.EpochsInDay) - if gcRetentionDays < 1 { - log.Infof("skipping gc of eth hashes as retention days is less than 1") + // Calculate the retention duration based on the number of epochs to retain. + // retentionDuration represents the total duration (in seconds) for which data should be retained before considering it for garbage collection. + // graceDuration represents the additional duration (in seconds) to retain data after the retention duration. + // Since time.Duration expects a nanosecond value, we multiply the total seconds by time.Second to convert it to seconds. + retentionDuration := time.Duration(si.gcRetentionEpochs*builtin.EpochDurationSeconds) * time.Second + graceDuration := time.Duration(graceEpochs*builtin.EpochDurationSeconds) * time.Second + + // Calculate the total duration to retain data. + totalRetentionDuration := retentionDuration + graceDuration + currHeadTime := time.Unix(int64(head.MinTimestamp()), 0) + // gcTime is the time that is (gcRetentionEpochs + graceEpochs) in seconds before currHeadTime + gcTime := currHeadTime.Add(-totalRetentionDuration) + if gcTime.Before(time.Unix(0, 0)) || gcTime.IsZero() { + log.Info("gcTime is invalid, skipping gc") return } - log.Infof("gc'ing eth hashes older than %d days", gcRetentionDays) - res, err = si.stmts.removeEthHashesOlderThanStmt.ExecContext(ctx, "-"+strconv.Itoa(int(gcRetentionDays))+" day") + log.Infof("gc'ing eth hashes before time %s", gcTime.UTC().String()) + + res, err = si.stmts.removeEthHashesBeforeTimeStmt.ExecContext(ctx, gcTime.Unix()) if err != nil { - log.Errorf("failed to gc eth hashes older than %d days: %w", gcRetentionDays, err) + log.Errorf("failed to gc eth hashes before time %s: %w", gcTime.String(), err) return } @@ -95,5 +107,5 @@ func (si *SqliteIndexer) gc(ctx context.Context) { return } - log.Infof("gc'd %d eth hashes older than %d days", rows, gcRetentionDays) + log.Infof("gc'd %d eth hashes before time %s", rows, gcTime.String()) } diff --git a/chain/index/gc_test.go b/chain/index/gc_test.go index 2be23240c91..fc98b5da37e 100644 --- a/chain/index/gc_test.go +++ b/chain/index/gc_test.go @@ -6,62 +6,195 @@ import ( "testing" "time" + "github.com/ipfs/go-cid" "github.com/stretchr/testify/require" + + "github.com/filecoin-project/go-state-types/abi" +) + +const ( + epochOne = 1 + epochTen = 10 + epochFifty = 50 + headEpoch = 60 + + validRetentionEpochs = 20 + highRetentionEpochs = 100 + lowRetentionEpochs = 1 ) func TestGC(t *testing.T) { - ctx := context.Background() - rng := pseudo.New(pseudo.NewSource(time.Now().UnixNano())) - - // head at height 60 - // insert tipsets at heigh 1,10,50. - // retention epochs is 20 - si, _, _ := setupWithHeadIndexed(t, 60, rng) - si.gcRetentionEpochs = 20 - defer func() { _ = si.Close() }() - - tsCid1 := randomCid(t, rng) - tsCid10 := randomCid(t, rng) - tsCid50 := randomCid(t, rng) - - insertTipsetMessage(t, si, tipsetMessage{ - tipsetKeyCid: tsCid1.Bytes(), - height: 1, - reverted: false, - messageCid: randomCid(t, rng).Bytes(), - messageIndex: 0, - }) - - insertTipsetMessage(t, si, tipsetMessage{ - tipsetKeyCid: tsCid10.Bytes(), - height: 10, - reverted: false, - messageCid: randomCid(t, rng).Bytes(), - messageIndex: 0, - }) - - insertTipsetMessage(t, si, tipsetMessage{ - tipsetKeyCid: tsCid50.Bytes(), - height: 50, - reverted: false, - messageCid: randomCid(t, rng).Bytes(), - messageIndex: 0, - }) - - si.gc(ctx) - - // tipset at height 1 and 10 should be removed - var count int - err := si.db.QueryRow("SELECT COUNT(*) FROM tipset_message WHERE height = 1").Scan(&count) - require.NoError(t, err) - require.Equal(t, 0, count) - - err = si.db.QueryRow("SELECT COUNT(*) FROM tipset_message WHERE height = 10").Scan(&count) - require.NoError(t, err) - require.Equal(t, 0, count) - - // tipset at height 50 should not be removed - err = si.db.QueryRow("SELECT COUNT(*) FROM tipset_message WHERE height = 50").Scan(&count) - require.NoError(t, err) - require.Equal(t, 1, count) + type tipsetData struct { + height abi.ChainEpoch + reverted bool + } + + tests := []struct { + name string + headHeight abi.ChainEpoch + gcRetentionEpochs int64 + timestamp uint64 // Minimum timestamp for the head TipSet + tipsets []tipsetData + expectedEpochTipsetDataCounts map[abi.ChainEpoch]int // expected data count(tipsetMsg, event, eventEntry), for each epoch + expectedEthTxHashCount int // expected eth tx hash count after gc + }{ + { + name: "Basic GC with some tipsets removed", + headHeight: headEpoch, + gcRetentionEpochs: validRetentionEpochs, + timestamp: 0, + tipsets: []tipsetData{ + {height: epochOne, reverted: false}, + {height: epochTen, reverted: false}, + {height: epochFifty, reverted: false}, + }, + expectedEpochTipsetDataCounts: map[abi.ChainEpoch]int{ + epochOne: 0, // Should be removed + epochTen: 0, // Should be removed + epochFifty: 1, // Should remain + }, + expectedEthTxHashCount: 1, // Only the entry for height 50 should remain + }, + { + name: "No GC when retention epochs is high", + headHeight: headEpoch, + gcRetentionEpochs: highRetentionEpochs, + timestamp: 0, + tipsets: []tipsetData{ + {height: epochOne, reverted: false}, + {height: epochTen, reverted: false}, + {height: epochFifty, reverted: false}, + }, + expectedEpochTipsetDataCounts: map[abi.ChainEpoch]int{ + epochOne: 1, // Should remain + epochTen: 1, // Should remain + epochFifty: 1, // Should remain + }, + expectedEthTxHashCount: 3, // All entries should remain + }, + { + name: "No GC when gcRetentionEpochs is zero", + headHeight: headEpoch, + gcRetentionEpochs: 0, + timestamp: 0, + tipsets: []tipsetData{ + {height: epochOne, reverted: false}, + {height: epochTen, reverted: false}, + {height: epochFifty, reverted: false}, + }, + expectedEpochTipsetDataCounts: map[abi.ChainEpoch]int{ + epochOne: 1, // Should remain + epochTen: 1, // Should remain + epochFifty: 1, // Should remain + }, + expectedEthTxHashCount: 3, // All entries should remain + }, + { + name: "GC should remove tipsets that are older than gcRetentionEpochs + gracEpochs", + headHeight: headEpoch, + gcRetentionEpochs: lowRetentionEpochs, // headHeight - gcRetentionEpochs - graceEpochs = 60 - 5 - 10 = 45 (removalEpoch) + timestamp: 0, + tipsets: []tipsetData{ + {height: epochFifty, reverted: false}, + {height: epochTen, reverted: false}, + {height: epochOne, reverted: false}, + }, + expectedEpochTipsetDataCounts: map[abi.ChainEpoch]int{ + epochOne: 0, // Should be removed + epochTen: 0, // Should be removed + epochFifty: 1, // Should remain + }, + expectedEthTxHashCount: 1, // Only the entry for height 50 should remain + }, + { + name: "skip gc if gcTime is zero", + headHeight: validRetentionEpochs + graceEpochs + 1, // adding 1 to headHeight to ensure removal epoch is not zero + gcRetentionEpochs: validRetentionEpochs, // removalEpoch = 1 + timestamp: 300, // totalRetentionDuration = (20+10)*10 = 300 seconds + tipsets: []tipsetData{ + {height: epochOne, reverted: false}, + {height: epochTen, reverted: false}, + {height: epochFifty, reverted: false}, + }, + expectedEpochTipsetDataCounts: map[abi.ChainEpoch]int{ + epochOne: 1, // Should remain + epochTen: 1, // Should remain + epochFifty: 1, // Should remain + }, + expectedEthTxHashCount: 3, // All entries should remain + }, + { + name: "Skip GC when gcTime is negative", + headHeight: validRetentionEpochs + graceEpochs + 1, // adding 1 to headHeight to ensure removal epoch is not zero + gcRetentionEpochs: validRetentionEpochs, // removalEpoch = 1 + timestamp: 200, // totalRetentionDuration = (20+10)*10 =300 seconds, gcTime = 200 -300 = -100 seconds + tipsets: []tipsetData{ + {height: epochOne, reverted: false}, + {height: epochTen, reverted: false}, + {height: epochFifty, reverted: false}, + }, + expectedEpochTipsetDataCounts: map[abi.ChainEpoch]int{ + epochOne: 1, // Should remain + epochTen: 1, // Should remain + epochFifty: 1, // Should remain + }, + expectedEthTxHashCount: 3, // All entries should remain + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + ctx := context.Background() + genesisTime := time.Now() + rng := pseudo.New(pseudo.NewSource(genesisTime.UnixNano())) + + // setup indexer with head tipset + ts := randomTipsetWithTimestamp(t, rng, tt.headHeight, []cid.Cid{}, tt.timestamp) + d := newDummyChainStore() + d.SetHeaviestTipSet(ts) + si, err := NewSqliteIndexer(":memory:", d, 0, false, 0) + require.NoError(t, err) + insertHead(t, si, ts, tt.headHeight) + + // set gc retention epochs + si.gcRetentionEpochs = tt.gcRetentionEpochs + + tipsetKeyCids := make(map[abi.ChainEpoch]cid.Cid) + + for _, tsData := range tt.tipsets { + t.Logf("inserting tipset at height %d", tsData.height) + + tsKeyCid, _, _ := insertRandomTipsetAtHeight(t, si, uint64(tsData.height), tsData.reverted, genesisTime) + tipsetKeyCids[tsData.height] = tsKeyCid + } + + si.gc(ctx) + + for height, expectedCount := range tt.expectedEpochTipsetDataCounts { + var count int + + err := si.db.QueryRow("SELECT COUNT(*) FROM tipset_message WHERE height = ?", height).Scan(&count) + require.NoError(t, err) + require.Equal(t, expectedCount, count, "Unexpected tipset_message count for height %d", height) + + tsKeyCid := tipsetKeyCids[height] + err = si.stmts.getNonRevertedTipsetEventCountStmt.QueryRow(tsKeyCid.Bytes()).Scan(&count) + require.NoError(t, err) + require.Equal(t, expectedCount, count, "Unexpected events count for height %d", height) + + err = si.stmts.getNonRevertedTipsetEventEntriesCountStmt.QueryRow(tsKeyCid.Bytes()).Scan(&count) + require.NoError(t, err) + require.Equal(t, expectedCount, count, "Unexpected event_entries count for height %d", height) + } + + var ethTxHashCount int + err = si.db.QueryRow("SELECT COUNT(*) FROM eth_tx_hash").Scan(ðTxHashCount) + require.NoError(t, err) + require.Equal(t, tt.expectedEthTxHashCount, ethTxHashCount, "Unexpected eth_tx_hash count") + + t.Cleanup(func() { + cleanup(t, si) + }) + }) + } } diff --git a/chain/index/helper_test.go b/chain/index/helper_test.go new file mode 100644 index 00000000000..861402d3f68 --- /dev/null +++ b/chain/index/helper_test.go @@ -0,0 +1,264 @@ +package index + +import ( + pseudo "math/rand" + "testing" + "time" + + "github.com/ipfs/go-cid" + mh "github.com/multiformats/go-multihash" + "github.com/test-go/testify/require" + + "github.com/filecoin-project/go-address" + "github.com/filecoin-project/go-state-types/abi" + "github.com/filecoin-project/go-state-types/crypto" + + "github.com/filecoin-project/lotus/chain/actors/builtin" + "github.com/filecoin-project/lotus/chain/types" + "github.com/filecoin-project/lotus/chain/types/ethtypes" +) + +func randomIDAddr(tb testing.TB, rng *pseudo.Rand) address.Address { + tb.Helper() + addr, err := address.NewIDAddress(uint64(rng.Int63())) + require.NoError(tb, err) + return addr +} + +func randomCid(tb testing.TB, rng *pseudo.Rand) cid.Cid { + tb.Helper() + cb := cid.V1Builder{Codec: cid.Raw, MhType: mh.IDENTITY} + c, err := cb.Sum(randomBytes(10, rng)) + require.NoError(tb, err) + return c +} + +func randomBytes(n int, rng *pseudo.Rand) []byte { + buf := make([]byte, n) + rng.Read(buf) + return buf +} + +func randomTipsetWithTimestamp(tb testing.TB, rng *pseudo.Rand, h abi.ChainEpoch, parents []cid.Cid, timeStamp uint64) *types.TipSet { + tb.Helper() + + if timeStamp == 0 { + timeStamp = uint64(time.Now().Add(time.Duration(h) * builtin.EpochDurationSeconds * time.Second).Unix()) + } + + ts, err := types.NewTipSet([]*types.BlockHeader{ + { + Height: h, + Miner: randomIDAddr(tb, rng), + + Parents: parents, + + Ticket: &types.Ticket{VRFProof: []byte{byte(h % 2)}}, + + ParentStateRoot: randomCid(tb, rng), + Messages: randomCid(tb, rng), + ParentMessageReceipts: randomCid(tb, rng), + + BlockSig: &crypto.Signature{Type: crypto.SigTypeBLS}, + BLSAggregate: &crypto.Signature{Type: crypto.SigTypeBLS}, + + Timestamp: timeStamp, + }, + { + Height: h, + Miner: randomIDAddr(tb, rng), + + Parents: parents, + + Ticket: &types.Ticket{VRFProof: []byte{byte((h + 1) % 2)}}, + + ParentStateRoot: randomCid(tb, rng), + Messages: randomCid(tb, rng), + ParentMessageReceipts: randomCid(tb, rng), + + BlockSig: &crypto.Signature{Type: crypto.SigTypeBLS}, + BLSAggregate: &crypto.Signature{Type: crypto.SigTypeBLS}, + + Timestamp: timeStamp, + }, + }) + + require.NoError(tb, err) + + return ts +} + +func fakeTipSet(tb testing.TB, rng *pseudo.Rand, h abi.ChainEpoch, parents []cid.Cid) *types.TipSet { + tb.Helper() + + ts, err := types.NewTipSet([]*types.BlockHeader{ + { + Height: h, + Miner: randomIDAddr(tb, rng), + + Parents: parents, + + Ticket: &types.Ticket{VRFProof: []byte{byte(h % 2)}}, + + ParentStateRoot: randomCid(tb, rng), + Messages: randomCid(tb, rng), + ParentMessageReceipts: randomCid(tb, rng), + + BlockSig: &crypto.Signature{Type: crypto.SigTypeBLS}, + BLSAggregate: &crypto.Signature{Type: crypto.SigTypeBLS}, + }, + { + Height: h, + Miner: randomIDAddr(tb, rng), + + Parents: parents, + + Ticket: &types.Ticket{VRFProof: []byte{byte((h + 1) % 2)}}, + + ParentStateRoot: randomCid(tb, rng), + Messages: randomCid(tb, rng), + ParentMessageReceipts: randomCid(tb, rng), + + BlockSig: &crypto.Signature{Type: crypto.SigTypeBLS}, + BLSAggregate: &crypto.Signature{Type: crypto.SigTypeBLS}, + }, + }) + + require.NoError(tb, err) + + return ts +} + +func setupWithHeadIndexed(t *testing.T, headHeight abi.ChainEpoch, rng *pseudo.Rand) (*SqliteIndexer, *types.TipSet, *dummyChainStore) { + head := fakeTipSet(t, rng, headHeight, []cid.Cid{}) + d := newDummyChainStore() + d.SetHeaviestTipSet(head) + + s, err := NewSqliteIndexer(":memory:", d, 0, false, 0) + require.NoError(t, err) + insertHead(t, s, head, headHeight) + + return s, head, d +} + +func cleanup(t *testing.T, s *SqliteIndexer) { + err := s.Close() + require.NoError(t, err) +} + +func insertHead(t *testing.T, s *SqliteIndexer, head *types.TipSet, height abi.ChainEpoch) { + headKeyBytes, err := toTipsetKeyCidBytes(head) + require.NoError(t, err) + + insertTipsetMessage(t, s, tipsetMessage{ + tipsetKeyCid: headKeyBytes, + height: uint64(height), + reverted: false, + messageCid: nil, + messageIndex: -1, + }) +} + +func insertEthTxHash(t *testing.T, s *SqliteIndexer, ethTxHash ethtypes.EthHash, messageCid cid.Cid) { + msgCidBytes := messageCid.Bytes() + + res, err := s.stmts.insertEthTxHashStmt.Exec(ethTxHash.String(), msgCidBytes) + require.NoError(t, err) + rowsAffected, err := res.RowsAffected() + require.NoError(t, err) + require.Equal(t, int64(1), rowsAffected) +} + +func insertRandomTipsetAtHeight(t *testing.T, si *SqliteIndexer, height uint64, reverted bool, genesisTime time.Time) (cid.Cid, cid.Cid, int64) { + rng := pseudo.New(pseudo.NewSource(time.Now().UnixNano())) + tsCid := randomCid(t, rng) + msgCid := randomCid(t, rng) + + msgID := insertTipsetMessage(t, si, tipsetMessage{ + tipsetKeyCid: tsCid.Bytes(), + height: height, + reverted: reverted, + messageCid: msgCid.Bytes(), + messageIndex: 0, + }) + + eventID := insertEvent(t, si, event{ + messageID: msgID, + eventIndex: 0, + emitterAddr: randomIDAddr(t, rng).Bytes(), + reverted: reverted, + }) + + insertEthTxHashAtTimeStamp( + t, + si, + ethtypes.EthHash(randomBytes(32, rng)), + msgCid, + genesisTime.Add(time.Duration(height)*builtin.EpochDurationSeconds*time.Second).Unix(), + ) + + insertEventEntry(t, si, eventEntry{ + eventID: eventID, + indexed: true, + flags: []byte("test_data"), + key: "test_key", + codec: 0, + value: []byte("test_value"), + }) + + return tsCid, msgCid, eventID +} + +func insertEthTxHashAtTimeStamp(t *testing.T, s *SqliteIndexer, ethTxHash ethtypes.EthHash, messageCid cid.Cid, timestamp int64) { + msgCidBytes := messageCid.Bytes() + + res, err := s.db.Exec("INSERT INTO eth_tx_hash (tx_hash, message_cid, inserted_at) VALUES (?, ?, ?)", ethTxHash.String(), msgCidBytes, timestamp) + require.NoError(t, err) + rowsAffected, err := res.RowsAffected() + require.NoError(t, err) + require.Equal(t, int64(1), rowsAffected) +} + +func insertTipsetMessage(t *testing.T, s *SqliteIndexer, ts tipsetMessage) int64 { + res, err := s.stmts.insertTipsetMessageStmt.Exec(ts.tipsetKeyCid, ts.height, ts.reverted, ts.messageCid, ts.messageIndex) + require.NoError(t, err) + + rowsAffected, err := res.RowsAffected() + require.NoError(t, err) + require.Equal(t, int64(1), rowsAffected) + + messageID, err := res.LastInsertId() + require.NoError(t, err) + require.NotEqual(t, int64(0), messageID) + + // read back the message to verify it was inserted correctly + verifyTipsetMessage(t, s, messageID, ts) + + return messageID +} + +func insertEvent(t *testing.T, s *SqliteIndexer, e event) int64 { + res, err := s.stmts.insertEventStmt.Exec(e.messageID, e.eventIndex, e.emitterAddr, e.reverted) + require.NoError(t, err) + + rowsAffected, err := res.RowsAffected() + require.NoError(t, err) + require.Equal(t, int64(1), rowsAffected) + + eventID, err := res.LastInsertId() + require.NoError(t, err) + require.NotEqual(t, int64(0), eventID) + + verifyEvent(t, s, eventID, e) + + return eventID +} + +func insertEventEntry(t *testing.T, s *SqliteIndexer, ee eventEntry) { + res, err := s.stmts.insertEventEntryStmt.Exec(ee.eventID, ee.indexed, ee.flags, ee.key, ee.codec, ee.value) + require.NoError(t, err) + + rowsAffected, err := res.RowsAffected() + require.NoError(t, err) + require.Equal(t, int64(1), rowsAffected) +} diff --git a/chain/index/indexer.go b/chain/index/indexer.go index d537ab18fdd..b3d5c7041a1 100644 --- a/chain/index/indexer.go +++ b/chain/index/indexer.go @@ -34,7 +34,7 @@ type preparedStatements struct { hasTipsetStmt *sql.Stmt updateTipsetToNonRevertedStmt *sql.Stmt removeTipsetsBeforeHeightStmt *sql.Stmt - removeEthHashesOlderThanStmt *sql.Stmt + removeEthHashesBeforeTimeStmt *sql.Stmt updateTipsetsToRevertedFromHeightStmt *sql.Stmt updateEventsToRevertedFromHeightStmt *sql.Stmt isIndexEmptyStmt *sql.Stmt diff --git a/chain/index/read_test.go b/chain/index/read_test.go index c7a6797d0bf..7a8dc8bdce8 100644 --- a/chain/index/read_test.go +++ b/chain/index/read_test.go @@ -9,12 +9,9 @@ import ( "time" "github.com/ipfs/go-cid" - mh "github.com/multiformats/go-multihash" "github.com/stretchr/testify/require" - "github.com/filecoin-project/go-address" "github.com/filecoin-project/go-state-types/abi" - "github.com/filecoin-project/go-state-types/crypto" "github.com/filecoin-project/lotus/chain/actors/adt" "github.com/filecoin-project/lotus/chain/types" @@ -79,41 +76,6 @@ func TestGetMsgInfo(t *testing.T) { require.Equal(t, abi.ChainEpoch(1), mi.Epoch) } -func setupWithHeadIndexed(t *testing.T, headHeight abi.ChainEpoch, rng *pseudo.Rand) (*SqliteIndexer, *types.TipSet, *dummyChainStore) { - head := fakeTipSet(t, rng, headHeight, []cid.Cid{}) - d := newDummyChainStore() - d.SetHeaviestTipSet(head) - - s, err := NewSqliteIndexer(":memory:", d, 0, false, 0) - require.NoError(t, err) - insertHead(t, s, head, headHeight) - - return s, head, d -} - -func insertHead(t *testing.T, s *SqliteIndexer, head *types.TipSet, height abi.ChainEpoch) { - headKeyBytes, err := toTipsetKeyCidBytes(head) - require.NoError(t, err) - - insertTipsetMessage(t, s, tipsetMessage{ - tipsetKeyCid: headKeyBytes, - height: uint64(height), - reverted: false, - messageCid: nil, - messageIndex: -1, - }) -} - -func insertEthTxHash(t *testing.T, s *SqliteIndexer, ethTxHash ethtypes.EthHash, messageCid cid.Cid) { - msgCidBytes := messageCid.Bytes() - - res, err := s.stmts.insertEthTxHashStmt.Exec(ethTxHash.String(), msgCidBytes) - require.NoError(t, err) - rowsAffected, err := res.RowsAffected() - require.NoError(t, err) - require.Equal(t, int64(1), rowsAffected) -} - type dummyChainStore struct { mu sync.RWMutex @@ -240,64 +202,3 @@ func (d *dummyChainStore) SetTipsetByHeightAndKey(h abi.ChainEpoch, tsk types.Ti d.heightToTipSet[h] = ts d.keyToTipSet[tsk] = ts } - -func randomIDAddr(tb testing.TB, rng *pseudo.Rand) address.Address { - tb.Helper() - addr, err := address.NewIDAddress(uint64(rng.Int63())) - require.NoError(tb, err) - return addr -} - -func randomCid(tb testing.TB, rng *pseudo.Rand) cid.Cid { - tb.Helper() - cb := cid.V1Builder{Codec: cid.Raw, MhType: mh.IDENTITY} - c, err := cb.Sum(randomBytes(10, rng)) - require.NoError(tb, err) - return c -} - -func randomBytes(n int, rng *pseudo.Rand) []byte { - buf := make([]byte, n) - rng.Read(buf) - return buf -} - -func fakeTipSet(tb testing.TB, rng *pseudo.Rand, h abi.ChainEpoch, parents []cid.Cid) *types.TipSet { - tb.Helper() - ts, err := types.NewTipSet([]*types.BlockHeader{ - { - Height: h, - Miner: randomIDAddr(tb, rng), - - Parents: parents, - - Ticket: &types.Ticket{VRFProof: []byte{byte(h % 2)}}, - - ParentStateRoot: randomCid(tb, rng), - Messages: randomCid(tb, rng), - ParentMessageReceipts: randomCid(tb, rng), - - BlockSig: &crypto.Signature{Type: crypto.SigTypeBLS}, - BLSAggregate: &crypto.Signature{Type: crypto.SigTypeBLS}, - }, - { - Height: h, - Miner: randomIDAddr(tb, rng), - - Parents: parents, - - Ticket: &types.Ticket{VRFProof: []byte{byte((h + 1) % 2)}}, - - ParentStateRoot: randomCid(tb, rng), - Messages: randomCid(tb, rng), - ParentMessageReceipts: randomCid(tb, rng), - - BlockSig: &crypto.Signature{Type: crypto.SigTypeBLS}, - BLSAggregate: &crypto.Signature{Type: crypto.SigTypeBLS}, - }, - }) - - require.NoError(tb, err) - - return ts -} diff --git a/go.mod b/go.mod index 886bd556f34..c6f36b29ff0 100644 --- a/go.mod +++ b/go.mod @@ -134,6 +134,7 @@ require ( github.com/sirupsen/logrus v1.9.2 github.com/stretchr/testify v1.9.0 github.com/syndtr/goleveldb v1.0.1-0.20210819022825-2ae1ddf74ef7 + github.com/test-go/testify v1.1.4 github.com/triplewz/poseidon v0.0.1 github.com/urfave/cli/v2 v2.25.5 github.com/whyrusleeping/bencher v0.0.0-20190829221104-bb6607aa8bba