Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

services/horizon/internal/ingest: Add more fine-grained reap metrics #5385

Merged
merged 4 commits into from
Jul 16, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 21 additions & 17 deletions services/horizon/internal/db2/history/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -282,7 +282,7 @@ type IngestionQ interface {
NewTradeBatchInsertBuilder() TradeBatchInsertBuilder
RebuildTradeAggregationTimes(ctx context.Context, from, to strtime.Millis, roundingSlippageFilter int) error
RebuildTradeAggregationBuckets(ctx context.Context, fromLedger, toLedger uint32, roundingSlippageFilter int) error
ReapLookupTables(ctx context.Context, offsets map[string]int64) (map[string]int64, map[string]int64, error)
ReapLookupTables(ctx context.Context, offsets map[string]int64) (map[string]LookupTableReapResult, error)
CreateAssets(ctx context.Context, assets []xdr.Asset, batchSize int) (map[string]Asset, error)
QTransactions
QTrustLines
Expand Down Expand Up @@ -971,27 +971,27 @@ type tableObjectFieldPair struct {
objectField string
}

type LookupTableReapResult struct {
Offset int64
RowsDeleted int64
Duration time.Duration
}

// ReapLookupTables removes rows from lookup tables like history_claimable_balances
// which aren't used (orphaned), i.e. history entries for them were reaped.
// This method must be executed inside ingestion transaction. Otherwise it may
// create invalid state in lookup and history tables.
func (q Q) ReapLookupTables(ctx context.Context, offsets map[string]int64) (
map[string]int64, // deleted rows count
map[string]int64, // new offsets
map[string]LookupTableReapResult,
error,
) {
if q.GetTx() == nil {
return nil, nil, errors.New("cannot be called outside of an ingestion transaction")
return nil, errors.New("cannot be called outside of an ingestion transaction")
}

const batchSize = 1000

deletedCount := make(map[string]int64)

if offsets == nil {
offsets = make(map[string]int64)
}

results := map[string]LookupTableReapResult{}
for table, historyTables := range map[string][]tableObjectFieldPair{
"history_accounts": {
{
Expand Down Expand Up @@ -1054,9 +1054,10 @@ func (q Q) ReapLookupTables(ctx context.Context, offsets map[string]int64) (
},
},
} {
startTime := time.Now()
query, err := constructReapLookupTablesQuery(table, historyTables, batchSize, offsets[table])
if err != nil {
return nil, nil, errors.Wrap(err, "error constructing a query")
return nil, errors.Wrap(err, "error constructing a query")
}

// Find new offset before removing the rows
Expand All @@ -1066,7 +1067,7 @@ func (q Q) ReapLookupTables(ctx context.Context, offsets map[string]int64) (
if q.NoRows(err) {
newOffset = 0
} else {
return nil, nil, err
return nil, err
}
}

Expand All @@ -1075,18 +1076,21 @@ func (q Q) ReapLookupTables(ctx context.Context, offsets map[string]int64) (
query,
)
if err != nil {
return nil, nil, errors.Wrapf(err, "error running query: %s", query)
return nil, errors.Wrapf(err, "error running query: %s", query)
}

rows, err := res.RowsAffected()
if err != nil {
return nil, nil, errors.Wrapf(err, "error running RowsAffected after query: %s", query)
return nil, errors.Wrapf(err, "error running RowsAffected after query: %s", query)
}

deletedCount[table] = rows
offsets[table] = newOffset
results[table] = LookupTableReapResult{
Offset: newOffset,
RowsDeleted: rows,
Duration: time.Since(startTime),
}
}
return deletedCount, offsets, nil
return results, nil
}

// constructReapLookupTablesQuery creates a query like (using history_claimable_balances
Expand Down
20 changes: 10 additions & 10 deletions services/horizon/internal/db2/history/reap_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ func TestReapLookupTables(t *testing.T) {
err = q.Begin(tt.Ctx)
tt.Require.NoError(err)

deletedCount, newOffsets, err := q.ReapLookupTables(tt.Ctx, nil)
results, err := q.ReapLookupTables(tt.Ctx, nil)
tt.Require.NoError(err)

err = q.Commit()
Expand All @@ -77,23 +77,23 @@ func TestReapLookupTables(t *testing.T) {

tt.Assert.Equal(25, prevAccounts, "prevAccounts")
tt.Assert.Equal(1, curAccounts, "curAccounts")
tt.Assert.Equal(int64(24), deletedCount["history_accounts"], `deletedCount["history_accounts"]`)
tt.Assert.Equal(int64(24), results["history_accounts"].RowsDeleted, `deletedCount["history_accounts"]`)

tt.Assert.Equal(7, prevAssets, "prevAssets")
tt.Assert.Equal(0, curAssets, "curAssets")
tt.Assert.Equal(int64(7), deletedCount["history_assets"], `deletedCount["history_assets"]`)
tt.Assert.Equal(int64(7), results["history_assets"].RowsDeleted, `deletedCount["history_assets"]`)

tt.Assert.Equal(1, prevClaimableBalances, "prevClaimableBalances")
tt.Assert.Equal(0, curClaimableBalances, "curClaimableBalances")
tt.Assert.Equal(int64(1), deletedCount["history_claimable_balances"], `deletedCount["history_claimable_balances"]`)
tt.Assert.Equal(int64(1), results["history_claimable_balances"].RowsDeleted, `deletedCount["history_claimable_balances"]`)

tt.Assert.Equal(1, prevLiquidityPools, "prevLiquidityPools")
tt.Assert.Equal(0, curLiquidityPools, "curLiquidityPools")
tt.Assert.Equal(int64(1), deletedCount["history_liquidity_pools"], `deletedCount["history_liquidity_pools"]`)
tt.Assert.Equal(int64(1), results["history_liquidity_pools"].RowsDeleted, `deletedCount["history_liquidity_pools"]`)

tt.Assert.Len(newOffsets, 4)
tt.Assert.Equal(int64(0), newOffsets["history_accounts"])
tt.Assert.Equal(int64(0), newOffsets["history_assets"])
tt.Assert.Equal(int64(0), newOffsets["history_claimable_balances"])
tt.Assert.Equal(int64(0), newOffsets["history_liquidity_pools"])
tt.Assert.Len(results, 4)
tt.Assert.Equal(int64(0), results["history_accounts"].Offset)
tt.Assert.Equal(int64(0), results["history_assets"].Offset)
tt.Assert.Equal(int64(0), results["history_claimable_balances"].Offset)
tt.Assert.Equal(int64(0), results["history_liquidity_pools"].Offset)
}
39 changes: 24 additions & 15 deletions services/horizon/internal/ingest/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,9 +145,8 @@ type Metrics struct {
// duration of rebuilding trade aggregation buckets.
LedgerIngestionTradeAggregationDuration prometheus.Summary

// LedgerIngestionReapLookupTablesDuration exposes timing metrics about the rate and
// duration of reaping lookup tables.
LedgerIngestionReapLookupTablesDuration prometheus.Summary
ReapDurationByLookupTable *prometheus.SummaryVec
RowsReapedByLookupTable *prometheus.SummaryVec

// StateVerifyDuration exposes timing metrics about the rate and
// duration of state verification.
Expand Down Expand Up @@ -327,6 +326,7 @@ func NewSystem(config Config) (System, error) {
config.ReapConfig,
config.HistorySession,
),
reapOffsets: map[string]int64{},
tamirms marked this conversation as resolved.
Show resolved Hide resolved
}

system.initMetrics()
Expand Down Expand Up @@ -367,11 +367,17 @@ func (s *system) initMetrics() {
Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.99: 0.001},
})

s.metrics.LedgerIngestionReapLookupTablesDuration = prometheus.NewSummary(prometheus.SummaryOpts{
Namespace: "horizon", Subsystem: "ingest", Name: "ledger_ingestion_reap_lookup_tables_duration_seconds",
Help: "ledger ingestion reap lookup tables durations, sliding window = 10m",
s.metrics.ReapDurationByLookupTable = prometheus.NewSummaryVec(prometheus.SummaryOpts{
Namespace: "horizon", Subsystem: "ingest", Name: "reap_lookup_tables_duration_seconds",
Help: "reap lookup tables durations, sliding window = 10m",
Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.99: 0.001},
})
}, []string{"table"})

s.metrics.RowsReapedByLookupTable = prometheus.NewSummaryVec(prometheus.SummaryOpts{
Namespace: "horizon", Subsystem: "ingest", Name: "reap_lookup_tables_rows_reaped",
Help: "rows delated during lookup tables reap, sliding window = 10m",
Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.99: 0.001},
}, []string{"table"})

s.metrics.StateVerifyDuration = prometheus.NewSummary(prometheus.SummaryOpts{
Namespace: "horizon", Subsystem: "ingest", Name: "state_verify_duration_seconds",
Expand Down Expand Up @@ -490,7 +496,8 @@ func (s *system) RegisterMetrics(registry *prometheus.Registry) {
registry.MustRegister(s.metrics.LocalLatestLedger)
registry.MustRegister(s.metrics.LedgerIngestionDuration)
registry.MustRegister(s.metrics.LedgerIngestionTradeAggregationDuration)
registry.MustRegister(s.metrics.LedgerIngestionReapLookupTablesDuration)
registry.MustRegister(s.metrics.ReapDurationByLookupTable)
registry.MustRegister(s.metrics.RowsReapedByLookupTable)
registry.MustRegister(s.metrics.StateVerifyDuration)
registry.MustRegister(s.metrics.StateInvalidGauge)
registry.MustRegister(s.metrics.LedgerStatsCounter)
Expand Down Expand Up @@ -793,7 +800,7 @@ func (s *system) maybeReapLookupTables(lastIngestedLedger uint32) {
defer cancel()

reapStart := time.Now()
deletedCount, newOffsets, err := s.historyQ.ReapLookupTables(ctx, s.reapOffsets)
results, err := s.historyQ.ReapLookupTables(ctx, s.reapOffsets)
if err != nil {
log.WithError(err).Warn("Error reaping lookup tables")
return
Expand All @@ -807,18 +814,20 @@ func (s *system) maybeReapLookupTables(lastIngestedLedger uint32) {

totalDeleted := int64(0)
reapLog := log
for table, c := range deletedCount {
totalDeleted += c
reapLog = reapLog.WithField(table, c)
for table, result := range results {
totalDeleted += result.RowsDeleted
reapLog = reapLog.WithField(table, result)
s.reapOffsets[table] = result.Offset
s.Metrics().RowsReapedByLookupTable.With(prometheus.Labels{"table": table}).Observe(float64(result.RowsDeleted))
s.Metrics().ReapDurationByLookupTable.With(prometheus.Labels{"table": table}).Observe(result.Duration.Seconds())
}

if totalDeleted > 0 {
reapLog.Info("Reaper deleted rows from lookup tables")
}

s.reapOffsets = newOffsets
reapDuration := time.Since(reapStart).Seconds()
s.Metrics().LedgerIngestionReapLookupTablesDuration.Observe(float64(reapDuration))
s.Metrics().RowsReapedByLookupTable.With(prometheus.Labels{"table": "total"}).Observe(float64(totalDeleted))
s.Metrics().ReapDurationByLookupTable.With(prometheus.Labels{"table": "total"}).Observe(time.Since(reapStart).Seconds())
}

func (s *system) incrementStateVerificationErrors() int {
Expand Down
11 changes: 4 additions & 7 deletions services/horizon/internal/ingest/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -562,16 +562,13 @@ func (m *mockDBQ) NewTradeBatchInsertBuilder() history.TradeBatchInsertBuilder {
return args.Get(0).(history.TradeBatchInsertBuilder)
}

func (m *mockDBQ) ReapLookupTables(ctx context.Context, offsets map[string]int64) (map[string]int64, map[string]int64, error) {
func (m *mockDBQ) ReapLookupTables(ctx context.Context, offsets map[string]int64) (map[string]history.LookupTableReapResult, error) {
args := m.Called(ctx, offsets)
var r1, r2 map[string]int64
var r1 map[string]history.LookupTableReapResult
if args.Get(0) != nil {
r1 = args.Get(0).(map[string]int64)
r1 = args.Get(0).(map[string]history.LookupTableReapResult)
}
if args.Get(1) != nil {
r1 = args.Get(1).(map[string]int64)
}
return r1, r2, args.Error(2)
return r1, args.Error(2)
}

func (m *mockDBQ) RebuildTradeAggregationTimes(ctx context.Context, from, to strtime.Millis, roundingSlippageFilter int) error {
Expand Down
Loading