Skip to content

Commit

Permalink
refactor: use epochs to gc eth tx hashes from chain indexer (#12516)
Browse files Browse the repository at this point in the history
* refactor: use epochs to gc eth tx hashes from chain indexer

* test: update gc test

* fix: linter issue

* refactor: gc test

* add comments explaining calculation for epoch duration

* address comments
  • Loading branch information
akaladarshi authored Oct 4, 2024
1 parent c0a6ea0 commit f2bff6f
Show file tree
Hide file tree
Showing 8 changed files with 476 additions and 209 deletions.
2 changes: 1 addition & 1 deletion chain/index/ddls.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ func preparedStatementMapping(ps *preparedStatements) map[**sql.Stmt]string {
&ps.updateTipsetToNonRevertedStmt: "UPDATE tipset_message SET reverted = 0 WHERE tipset_key_cid = ?",
&ps.updateTipsetToRevertedStmt: "UPDATE tipset_message SET reverted = 1 WHERE tipset_key_cid = ?",
&ps.removeTipsetsBeforeHeightStmt: "DELETE FROM tipset_message WHERE height < ?",
&ps.removeEthHashesOlderThanStmt: "DELETE FROM eth_tx_hash WHERE inserted_at < datetime('now', ?)",
&ps.removeEthHashesBeforeTimeStmt: "DELETE FROM eth_tx_hash WHERE inserted_at < ?",
&ps.updateTipsetsToRevertedFromHeightStmt: "UPDATE tipset_message SET reverted = 1 WHERE height >= ?",
&ps.updateEventsToRevertedFromHeightStmt: "UPDATE event SET reverted = 1 WHERE message_id IN (SELECT message_id FROM tipset_message WHERE height >= ?)",
&ps.isIndexEmptyStmt: "SELECT NOT EXISTS(SELECT 1 FROM tipset_message LIMIT 1)",
Expand Down
44 changes: 0 additions & 44 deletions chain/index/ddls_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -662,47 +662,3 @@ func verifyNonRevertedEventEntriesCount(t *testing.T, s *SqliteIndexer, tipsetKe
require.NoError(t, err)
require.Equal(t, expectedCount, count)
}

func insertTipsetMessage(t *testing.T, s *SqliteIndexer, ts tipsetMessage) int64 {
res, err := s.stmts.insertTipsetMessageStmt.Exec(ts.tipsetKeyCid, ts.height, ts.reverted, ts.messageCid, ts.messageIndex)
require.NoError(t, err)

rowsAffected, err := res.RowsAffected()
require.NoError(t, err)
require.Equal(t, int64(1), rowsAffected)

messageID, err := res.LastInsertId()
require.NoError(t, err)
require.NotEqual(t, int64(0), messageID)

// read back the message to verify it was inserted correctly
verifyTipsetMessage(t, s, messageID, ts)

return messageID
}

func insertEvent(t *testing.T, s *SqliteIndexer, e event) int64 {
res, err := s.stmts.insertEventStmt.Exec(e.messageID, e.eventIndex, e.emitterAddr, e.reverted)
require.NoError(t, err)

rowsAffected, err := res.RowsAffected()
require.NoError(t, err)
require.Equal(t, int64(1), rowsAffected)

eventID, err := res.LastInsertId()
require.NoError(t, err)
require.NotEqual(t, int64(0), eventID)

verifyEvent(t, s, eventID, e)

return eventID
}

func insertEventEntry(t *testing.T, s *SqliteIndexer, ee eventEntry) {
res, err := s.stmts.insertEventEntryStmt.Exec(ee.eventID, ee.indexed, ee.flags, ee.key, ee.codec, ee.value)
require.NoError(t, err)

rowsAffected, err := res.RowsAffected()
require.NoError(t, err)
require.Equal(t, int64(1), rowsAffected)
}
32 changes: 22 additions & 10 deletions chain/index/gc.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package index

import (
"context"
"strconv"
"time"

logging "github.com/ipfs/go-log/v2"
Expand All @@ -15,6 +14,8 @@ var (
cleanupInterval = time.Duration(4) * time.Hour
)

const graceEpochs = 10

func (si *SqliteIndexer) gcLoop() {
defer si.wg.Done()

Expand Down Expand Up @@ -50,7 +51,7 @@ func (si *SqliteIndexer) gc(ctx context.Context) {

head := si.cs.GetHeaviestTipSet()

removalEpoch := int64(head.Height()) - si.gcRetentionEpochs - 10 // 10 is for some grace period
removalEpoch := int64(head.Height()) - si.gcRetentionEpochs - graceEpochs
if removalEpoch <= 0 {
log.Info("no tipsets to gc")
return
Expand All @@ -75,17 +76,28 @@ func (si *SqliteIndexer) gc(ctx context.Context) {
// -------------------------------------------------------------------------------------------------
// Also GC eth hashes

// Convert gcRetentionEpochs to number of days
gcRetentionDays := si.gcRetentionEpochs / (builtin.EpochsInDay)
if gcRetentionDays < 1 {
log.Infof("skipping gc of eth hashes as retention days is less than 1")
// Calculate the retention duration based on the number of epochs to retain.
// retentionDuration represents the total duration (in seconds) for which data should be retained before considering it for garbage collection.
// graceDuration represents the additional duration (in seconds) to retain data after the retention duration.
// Since time.Duration expects a nanosecond value, we multiply the total seconds by time.Second to convert it to seconds.
retentionDuration := time.Duration(si.gcRetentionEpochs*builtin.EpochDurationSeconds) * time.Second
graceDuration := time.Duration(graceEpochs*builtin.EpochDurationSeconds) * time.Second

// Calculate the total duration to retain data.
totalRetentionDuration := retentionDuration + graceDuration
currHeadTime := time.Unix(int64(head.MinTimestamp()), 0)
// gcTime is the time that is (gcRetentionEpochs + graceEpochs) in seconds before currHeadTime
gcTime := currHeadTime.Add(-totalRetentionDuration)
if gcTime.Before(time.Unix(0, 0)) || gcTime.IsZero() {
log.Info("gcTime is invalid, skipping gc")
return
}

log.Infof("gc'ing eth hashes older than %d days", gcRetentionDays)
res, err = si.stmts.removeEthHashesOlderThanStmt.ExecContext(ctx, "-"+strconv.Itoa(int(gcRetentionDays))+" day")
log.Infof("gc'ing eth hashes before time %s", gcTime.UTC().String())

res, err = si.stmts.removeEthHashesBeforeTimeStmt.ExecContext(ctx, gcTime.Unix())
if err != nil {
log.Errorf("failed to gc eth hashes older than %d days: %w", gcRetentionDays, err)
log.Errorf("failed to gc eth hashes before time %s: %w", gcTime.String(), err)
return
}

Expand All @@ -95,5 +107,5 @@ func (si *SqliteIndexer) gc(ctx context.Context) {
return
}

log.Infof("gc'd %d eth hashes older than %d days", rows, gcRetentionDays)
log.Infof("gc'd %d eth hashes before time %s", rows, gcTime.String())
}
241 changes: 187 additions & 54 deletions chain/index/gc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,62 +6,195 @@ import (
"testing"
"time"

"github.com/ipfs/go-cid"
"github.com/stretchr/testify/require"

"github.com/filecoin-project/go-state-types/abi"
)

const (
epochOne = 1
epochTen = 10
epochFifty = 50
headEpoch = 60

validRetentionEpochs = 20
highRetentionEpochs = 100
lowRetentionEpochs = 1
)

func TestGC(t *testing.T) {
ctx := context.Background()
rng := pseudo.New(pseudo.NewSource(time.Now().UnixNano()))

// head at height 60
// insert tipsets at heigh 1,10,50.
// retention epochs is 20
si, _, _ := setupWithHeadIndexed(t, 60, rng)
si.gcRetentionEpochs = 20
defer func() { _ = si.Close() }()

tsCid1 := randomCid(t, rng)
tsCid10 := randomCid(t, rng)
tsCid50 := randomCid(t, rng)

insertTipsetMessage(t, si, tipsetMessage{
tipsetKeyCid: tsCid1.Bytes(),
height: 1,
reverted: false,
messageCid: randomCid(t, rng).Bytes(),
messageIndex: 0,
})

insertTipsetMessage(t, si, tipsetMessage{
tipsetKeyCid: tsCid10.Bytes(),
height: 10,
reverted: false,
messageCid: randomCid(t, rng).Bytes(),
messageIndex: 0,
})

insertTipsetMessage(t, si, tipsetMessage{
tipsetKeyCid: tsCid50.Bytes(),
height: 50,
reverted: false,
messageCid: randomCid(t, rng).Bytes(),
messageIndex: 0,
})

si.gc(ctx)

// tipset at height 1 and 10 should be removed
var count int
err := si.db.QueryRow("SELECT COUNT(*) FROM tipset_message WHERE height = 1").Scan(&count)
require.NoError(t, err)
require.Equal(t, 0, count)

err = si.db.QueryRow("SELECT COUNT(*) FROM tipset_message WHERE height = 10").Scan(&count)
require.NoError(t, err)
require.Equal(t, 0, count)

// tipset at height 50 should not be removed
err = si.db.QueryRow("SELECT COUNT(*) FROM tipset_message WHERE height = 50").Scan(&count)
require.NoError(t, err)
require.Equal(t, 1, count)
type tipsetData struct {
height abi.ChainEpoch
reverted bool
}

tests := []struct {
name string
headHeight abi.ChainEpoch
gcRetentionEpochs int64
timestamp uint64 // Minimum timestamp for the head TipSet
tipsets []tipsetData
expectedEpochTipsetDataCounts map[abi.ChainEpoch]int // expected data count(tipsetMsg, event, eventEntry), for each epoch
expectedEthTxHashCount int // expected eth tx hash count after gc
}{
{
name: "Basic GC with some tipsets removed",
headHeight: headEpoch,
gcRetentionEpochs: validRetentionEpochs,
timestamp: 0,
tipsets: []tipsetData{
{height: epochOne, reverted: false},
{height: epochTen, reverted: false},
{height: epochFifty, reverted: false},
},
expectedEpochTipsetDataCounts: map[abi.ChainEpoch]int{
epochOne: 0, // Should be removed
epochTen: 0, // Should be removed
epochFifty: 1, // Should remain
},
expectedEthTxHashCount: 1, // Only the entry for height 50 should remain
},
{
name: "No GC when retention epochs is high",
headHeight: headEpoch,
gcRetentionEpochs: highRetentionEpochs,
timestamp: 0,
tipsets: []tipsetData{
{height: epochOne, reverted: false},
{height: epochTen, reverted: false},
{height: epochFifty, reverted: false},
},
expectedEpochTipsetDataCounts: map[abi.ChainEpoch]int{
epochOne: 1, // Should remain
epochTen: 1, // Should remain
epochFifty: 1, // Should remain
},
expectedEthTxHashCount: 3, // All entries should remain
},
{
name: "No GC when gcRetentionEpochs is zero",
headHeight: headEpoch,
gcRetentionEpochs: 0,
timestamp: 0,
tipsets: []tipsetData{
{height: epochOne, reverted: false},
{height: epochTen, reverted: false},
{height: epochFifty, reverted: false},
},
expectedEpochTipsetDataCounts: map[abi.ChainEpoch]int{
epochOne: 1, // Should remain
epochTen: 1, // Should remain
epochFifty: 1, // Should remain
},
expectedEthTxHashCount: 3, // All entries should remain
},
{
name: "GC should remove tipsets that are older than gcRetentionEpochs + gracEpochs",
headHeight: headEpoch,
gcRetentionEpochs: lowRetentionEpochs, // headHeight - gcRetentionEpochs - graceEpochs = 60 - 5 - 10 = 45 (removalEpoch)
timestamp: 0,
tipsets: []tipsetData{
{height: epochFifty, reverted: false},
{height: epochTen, reverted: false},
{height: epochOne, reverted: false},
},
expectedEpochTipsetDataCounts: map[abi.ChainEpoch]int{
epochOne: 0, // Should be removed
epochTen: 0, // Should be removed
epochFifty: 1, // Should remain
},
expectedEthTxHashCount: 1, // Only the entry for height 50 should remain
},
{
name: "skip gc if gcTime is zero",
headHeight: validRetentionEpochs + graceEpochs + 1, // adding 1 to headHeight to ensure removal epoch is not zero
gcRetentionEpochs: validRetentionEpochs, // removalEpoch = 1
timestamp: 300, // totalRetentionDuration = (20+10)*10 = 300 seconds
tipsets: []tipsetData{
{height: epochOne, reverted: false},
{height: epochTen, reverted: false},
{height: epochFifty, reverted: false},
},
expectedEpochTipsetDataCounts: map[abi.ChainEpoch]int{
epochOne: 1, // Should remain
epochTen: 1, // Should remain
epochFifty: 1, // Should remain
},
expectedEthTxHashCount: 3, // All entries should remain
},
{
name: "Skip GC when gcTime is negative",
headHeight: validRetentionEpochs + graceEpochs + 1, // adding 1 to headHeight to ensure removal epoch is not zero
gcRetentionEpochs: validRetentionEpochs, // removalEpoch = 1
timestamp: 200, // totalRetentionDuration = (20+10)*10 =300 seconds, gcTime = 200 -300 = -100 seconds
tipsets: []tipsetData{
{height: epochOne, reverted: false},
{height: epochTen, reverted: false},
{height: epochFifty, reverted: false},
},
expectedEpochTipsetDataCounts: map[abi.ChainEpoch]int{
epochOne: 1, // Should remain
epochTen: 1, // Should remain
epochFifty: 1, // Should remain
},
expectedEthTxHashCount: 3, // All entries should remain
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
ctx := context.Background()
genesisTime := time.Now()
rng := pseudo.New(pseudo.NewSource(genesisTime.UnixNano()))

// setup indexer with head tipset
ts := randomTipsetWithTimestamp(t, rng, tt.headHeight, []cid.Cid{}, tt.timestamp)
d := newDummyChainStore()
d.SetHeaviestTipSet(ts)
si, err := NewSqliteIndexer(":memory:", d, 0, false, 0)
require.NoError(t, err)
insertHead(t, si, ts, tt.headHeight)

// set gc retention epochs
si.gcRetentionEpochs = tt.gcRetentionEpochs

tipsetKeyCids := make(map[abi.ChainEpoch]cid.Cid)

for _, tsData := range tt.tipsets {
t.Logf("inserting tipset at height %d", tsData.height)

tsKeyCid, _, _ := insertRandomTipsetAtHeight(t, si, uint64(tsData.height), tsData.reverted, genesisTime)
tipsetKeyCids[tsData.height] = tsKeyCid
}

si.gc(ctx)

for height, expectedCount := range tt.expectedEpochTipsetDataCounts {
var count int

err := si.db.QueryRow("SELECT COUNT(*) FROM tipset_message WHERE height = ?", height).Scan(&count)
require.NoError(t, err)
require.Equal(t, expectedCount, count, "Unexpected tipset_message count for height %d", height)

tsKeyCid := tipsetKeyCids[height]
err = si.stmts.getNonRevertedTipsetEventCountStmt.QueryRow(tsKeyCid.Bytes()).Scan(&count)
require.NoError(t, err)
require.Equal(t, expectedCount, count, "Unexpected events count for height %d", height)

err = si.stmts.getNonRevertedTipsetEventEntriesCountStmt.QueryRow(tsKeyCid.Bytes()).Scan(&count)
require.NoError(t, err)
require.Equal(t, expectedCount, count, "Unexpected event_entries count for height %d", height)
}

var ethTxHashCount int
err = si.db.QueryRow("SELECT COUNT(*) FROM eth_tx_hash").Scan(&ethTxHashCount)
require.NoError(t, err)
require.Equal(t, tt.expectedEthTxHashCount, ethTxHashCount, "Unexpected eth_tx_hash count")

t.Cleanup(func() {
cleanup(t, si)
})
})
}
}
Loading

0 comments on commit f2bff6f

Please sign in to comment.