From 4f10b54b487516b70de5e975607ed758accbf9ca Mon Sep 17 00:00:00 2001 From: Aditya Vyas Date: Wed, 13 Nov 2024 11:39:53 -0500 Subject: [PATCH] Add `getLedgers` implementation (#303) Add getLedgers implementation --- cmd/soroban-rpc/internal/config/main.go | 4 + cmd/soroban-rpc/internal/config/options.go | 35 +++ cmd/soroban-rpc/internal/db/ledger.go | 122 ++++++-- cmd/soroban-rpc/internal/db/ledger_test.go | 59 ++-- cmd/soroban-rpc/internal/db/mocks.go | 4 + .../integrationtest/get_ledgers_test.go | 71 +++++ cmd/soroban-rpc/internal/jsonrpc.go | 8 + .../methods/get_latest_ledger_test.go | 5 + .../internal/methods/get_ledgers.go | 252 +++++++++++++++ .../internal/methods/get_ledgers_test.go | 286 ++++++++++++++++++ cmd/soroban-rpc/internal/methods/json.go | 17 ++ 11 files changed, 821 insertions(+), 42 deletions(-) create mode 100644 cmd/soroban-rpc/internal/integrationtest/get_ledgers_test.go create mode 100644 cmd/soroban-rpc/internal/methods/get_ledgers.go create mode 100644 cmd/soroban-rpc/internal/methods/get_ledgers_test.go diff --git a/cmd/soroban-rpc/internal/config/main.go b/cmd/soroban-rpc/internal/config/main.go index 26a38802..f0a19522 100644 --- a/cmd/soroban-rpc/internal/config/main.go +++ b/cmd/soroban-rpc/internal/config/main.go @@ -26,6 +26,7 @@ type Config struct { CoreRequestTimeout time.Duration DefaultEventsLimit uint DefaultTransactionsLimit uint + DefaultLedgersLimit uint FriendbotURL string HistoryArchiveURLs []string HistoryArchiveUserAgent string @@ -34,6 +35,7 @@ type Config struct { LogLevel logrus.Level MaxEventsLimit uint MaxTransactionsLimit uint + MaxLedgersLimit uint MaxHealthyLedgerLatency time.Duration NetworkPassphrase string PreflightWorkerCount uint @@ -52,6 +54,7 @@ type Config struct { RequestBacklogGetLedgerEntriesQueueLimit uint RequestBacklogGetTransactionQueueLimit uint RequestBacklogGetTransactionsQueueLimit uint + RequestBacklogGetLedgersQueueLimit uint RequestBacklogSendTransactionQueueLimit uint RequestBacklogSimulateTransactionQueueLimit uint RequestBacklogGetFeeStatsTransactionQueueLimit uint @@ -65,6 +68,7 @@ type Config struct { MaxGetLedgerEntriesExecutionDuration time.Duration MaxGetTransactionExecutionDuration time.Duration MaxGetTransactionsExecutionDuration time.Duration + MaxGetLedgersExecutionDuration time.Duration MaxSendTransactionExecutionDuration time.Duration MaxSimulateTransactionExecutionDuration time.Duration MaxGetFeeStatsExecutionDuration time.Duration diff --git a/cmd/soroban-rpc/internal/config/options.go b/cmd/soroban-rpc/internal/config/options.go index 5fde5f6f..1a906c80 100644 --- a/cmd/soroban-rpc/internal/config/options.go +++ b/cmd/soroban-rpc/internal/config/options.go @@ -282,6 +282,28 @@ func (cfg *Config) options() Options { return nil }, }, + { + Name: "max-ledgers-limit", + Usage: "Maximum amount of ledgers allowed in a single getLedgers response", + ConfigKey: &cfg.MaxLedgersLimit, + DefaultValue: uint(200), + }, + { + Name: "default-ledgers-limit", + Usage: "Default cap on the amount of ledgers included in a single getLedgers response", + ConfigKey: &cfg.DefaultLedgersLimit, + DefaultValue: uint(50), + Validate: func(_ *Option) error { + if cfg.DefaultLedgersLimit > cfg.MaxLedgersLimit { + return fmt.Errorf( + "default-ledgers-limit (%v) cannot exceed max-ledgers-limit (%v)", + cfg.DefaultLedgersLimit, + cfg.MaxLedgersLimit, + ) + } + return nil + }, + }, { Name: "max-healthy-ledger-latency", Usage: "maximum ledger latency (i.e. time elapsed since the last known ledger closing time) considered to be healthy" + @@ -372,6 +394,13 @@ func (cfg *Config) options() Options { DefaultValue: uint(1000), Validate: positive, }, + { + TomlKey: strutils.KebabToConstantCase("request-backlog-get-ledgers-queue-limit"), + Usage: "Maximum number of outstanding getLedgers requests", + ConfigKey: &cfg.RequestBacklogGetLedgersQueueLimit, + DefaultValue: uint(1000), + Validate: positive, + }, { TomlKey: strutils.KebabToConstantCase("request-backlog-send-transaction-queue-limit"), Usage: "Maximum number of outstanding SendTransaction requests", @@ -453,6 +482,12 @@ func (cfg *Config) options() Options { ConfigKey: &cfg.MaxGetTransactionsExecutionDuration, DefaultValue: 5 * time.Second, }, + { + TomlKey: strutils.KebabToConstantCase("max-get-ledgers-execution-duration"), + Usage: "The maximum duration of time allowed for processing a getLedgers request. When that time elapses, the rpc server would return -32001 and abort the request's execution", + ConfigKey: &cfg.MaxGetLedgersExecutionDuration, + DefaultValue: 5 * time.Second, + }, { TomlKey: strutils.KebabToConstantCase("max-send-transaction-execution-duration"), Usage: "The maximum duration of time allowed for processing a sendTransaction request. When that time elapses, the rpc server would return -32001 and abort the request's execution", diff --git a/cmd/soroban-rpc/internal/db/ledger.go b/cmd/soroban-rpc/internal/db/ledger.go index 860e6aa1..730a2da6 100644 --- a/cmd/soroban-rpc/internal/db/ledger.go +++ b/cmd/soroban-rpc/internal/db/ledger.go @@ -2,10 +2,12 @@ package db import ( "context" + "database/sql" "fmt" sq "github.com/Masterminds/squirrel" + "github.com/stellar/go/support/db" "github.com/stellar/go/xdr" "github.com/stellar/soroban-rpc/cmd/soroban-rpc/internal/ledgerbucketwindow" @@ -22,20 +24,82 @@ type LedgerReader interface { StreamAllLedgers(ctx context.Context, f StreamLedgerFn) error GetLedgerRange(ctx context.Context) (ledgerbucketwindow.LedgerRange, error) StreamLedgerRange(ctx context.Context, startLedger uint32, endLedger uint32, f StreamLedgerFn) error + NewTx(ctx context.Context) (LedgerReaderTx, error) +} + +type LedgerReaderTx interface { + GetLedgerRange(ctx context.Context) (ledgerbucketwindow.LedgerRange, error) + BatchGetLedgers(ctx context.Context, sequence uint32, batchSize uint) ([]xdr.LedgerCloseMeta, error) + Done() error } type LedgerWriter interface { InsertLedger(ledger xdr.LedgerCloseMeta) error } +type readDB interface { + Select(ctx context.Context, dest interface{}, query sq.Sqlizer) error +} + type ledgerReader struct { db *DB } +type ledgerReaderTx struct { + tx db.SessionInterface + latestLedgerSeq uint32 + latestLedgerCloseTime int64 +} + +func (l ledgerReaderTx) GetLedgerRange(ctx context.Context) (ledgerbucketwindow.LedgerRange, error) { + if l.latestLedgerSeq != 0 { + return getLedgerRangeWithCache(ctx, l.tx, l.latestLedgerSeq, l.latestLedgerCloseTime) + } + return getLedgerRangeWithoutCache(ctx, l.tx) +} + +// BatchGetLedgers fetches ledgers in batches from the db. +func (l ledgerReaderTx) BatchGetLedgers(ctx context.Context, sequence uint32, + batchSize uint, +) ([]xdr.LedgerCloseMeta, error) { + sql := sq.Select("meta"). + From(ledgerCloseMetaTableName). + Where(sq.And{ + sq.GtOrEq{"sequence": sequence}, + sq.LtOrEq{"sequence": sequence + uint32(batchSize) - 1}, + }) + + results := make([]xdr.LedgerCloseMeta, 0, batchSize) + if err := l.tx.Select(ctx, &results, sql); err != nil { + return nil, err + } + + return results, nil +} + +func (l ledgerReaderTx) Done() error { + return l.tx.Rollback() +} + func NewLedgerReader(db *DB) LedgerReader { return ledgerReader{db: db} } +func (r ledgerReader) NewTx(ctx context.Context) (LedgerReaderTx, error) { + r.db.cache.RLock() + defer r.db.cache.RUnlock() + txSession := r.db.Clone() + if err := txSession.BeginTx(ctx, &sql.TxOptions{ReadOnly: true}); err != nil { + return nil, fmt.Errorf("failed to begin read transaction: %w", err) + } + tx := ledgerReaderTx{ + tx: txSession, + latestLedgerSeq: r.db.cache.latestLedgerSeq, + latestLedgerCloseTime: r.db.cache.latestLedgerCloseTime, + } + return tx, nil +} + // StreamAllLedgers runs f over all the ledgers in the database (until f errors or signals it's done). func (r ledgerReader) StreamAllLedgers(ctx context.Context, f StreamLedgerFn) error { sql := sq.Select("meta").From(ledgerCloseMetaTableName).OrderBy("sequence asc") @@ -112,32 +176,44 @@ func (r ledgerReader) GetLedgerRange(ctx context.Context) (ledgerbucketwindow.Le // Make use of the cached latest ledger seq and close time to query only the oldest ledger details. if latestLedgerSeqCache != 0 { - query := sq.Select("meta"). - From(ledgerCloseMetaTableName). - Where( - fmt.Sprintf("sequence = (SELECT MIN(sequence) FROM %s)", ledgerCloseMetaTableName), - ) - var lcm []xdr.LedgerCloseMeta - if err := r.db.Select(ctx, &lcm, query); err != nil { - return ledgerbucketwindow.LedgerRange{}, fmt.Errorf("couldn't query ledger range: %w", err) - } + return getLedgerRangeWithCache(ctx, r.db, latestLedgerSeqCache, latestLedgerCloseTimeCache) + } + return getLedgerRangeWithoutCache(ctx, r.db) +} - if len(lcm) == 0 { - return ledgerbucketwindow.LedgerRange{}, ErrEmptyDB - } +// getLedgerRangeWithCache uses the latest ledger cache to optimize the query. +// It only needs to look up the first ledger since we have the latest cached. +func getLedgerRangeWithCache(ctx context.Context, db readDB, + latestSeq uint32, latestTime int64, +) (ledgerbucketwindow.LedgerRange, error) { + query := sq.Select("meta"). + From(ledgerCloseMetaTableName). + Where( + fmt.Sprintf("sequence = (SELECT MIN(sequence) FROM %s)", ledgerCloseMetaTableName), + ) + var lcm []xdr.LedgerCloseMeta + if err := db.Select(ctx, &lcm, query); err != nil { + return ledgerbucketwindow.LedgerRange{}, fmt.Errorf("couldn't query ledger range: %w", err) + } - return ledgerbucketwindow.LedgerRange{ - FirstLedger: ledgerbucketwindow.LedgerInfo{ - Sequence: lcm[0].LedgerSequence(), - CloseTime: lcm[0].LedgerCloseTime(), - }, - LastLedger: ledgerbucketwindow.LedgerInfo{ - Sequence: latestLedgerSeqCache, - CloseTime: latestLedgerCloseTimeCache, - }, - }, nil + if len(lcm) == 0 { + return ledgerbucketwindow.LedgerRange{}, ErrEmptyDB } + return ledgerbucketwindow.LedgerRange{ + FirstLedger: ledgerbucketwindow.LedgerInfo{ + Sequence: lcm[0].LedgerSequence(), + CloseTime: lcm[0].LedgerCloseTime(), + }, + LastLedger: ledgerbucketwindow.LedgerInfo{ + Sequence: latestSeq, + CloseTime: latestTime, + }, + }, nil +} + +// getLedgerRangeWithoutCache queries both the first and last ledger when cache isn't available +func getLedgerRangeWithoutCache(ctx context.Context, db readDB) (ledgerbucketwindow.LedgerRange, error) { query := sq.Select("lcm.meta"). From(ledgerCloseMetaTableName + " as lcm"). Where(sq.Or{ @@ -146,7 +222,7 @@ func (r ledgerReader) GetLedgerRange(ctx context.Context) (ledgerbucketwindow.Le }).OrderBy("lcm.sequence ASC") var lcms []xdr.LedgerCloseMeta - if err := r.db.Select(ctx, &lcms, query); err != nil { + if err := db.Select(ctx, &lcms, query); err != nil { return ledgerbucketwindow.LedgerRange{}, fmt.Errorf("couldn't query ledger range: %w", err) } diff --git a/cmd/soroban-rpc/internal/db/ledger_test.go b/cmd/soroban-rpc/internal/db/ledger_test.go index 1b8b2fc0..d2fc7705 100644 --- a/cmd/soroban-rpc/internal/db/ledger_test.go +++ b/cmd/soroban-rpc/internal/db/ledger_test.go @@ -183,25 +183,8 @@ func TestGetLedgerRange_EmptyDB(t *testing.T) { } func BenchmarkGetLedgerRange(b *testing.B) { - db := NewTestDB(b) - logger := log.DefaultLogger - writer := NewReadWriter(logger, db, interfaces.MakeNoOpDeamon(), 100, 1_000_000, passphrase) - write, err := writer.NewTx(context.TODO()) - require.NoError(b, err) - - // create 100k tx rows - lcms := make([]xdr.LedgerCloseMeta, 0, 100_000) - for i := range cap(lcms) { - lcms = append(lcms, txMeta(uint32(1234+i), i%2 == 0)) - } - - ledgerW, txW := write.LedgerWriter(), write.TransactionWriter() - for _, lcm := range lcms { - require.NoError(b, ledgerW.InsertLedger(lcm)) - require.NoError(b, txW.InsertTransactions(lcm)) - } - require.NoError(b, write.Commit(lcms[len(lcms)-1])) - reader := NewLedgerReader(db) + testDB, lcms := setupBenchmarkingDB(b) + reader := NewLedgerReader(testDB) b.ResetTimer() for range b.N { @@ -212,6 +195,22 @@ func BenchmarkGetLedgerRange(b *testing.B) { } } +func BenchmarkBatchGetLedgers(b *testing.B) { + testDB, lcms := setupBenchmarkingDB(b) + reader := NewLedgerReader(testDB) + readTx, err := reader.NewTx(context.Background()) + require.NoError(b, err) + batchSize := uint(200) // using the current maximum value for getLedgers endpoint + + b.ResetTimer() + for range b.N { + ledgers, err := readTx.BatchGetLedgers(context.TODO(), 1334, batchSize) + require.NoError(b, err) + assert.Equal(b, lcms[0].LedgerSequence(), ledgers[0].LedgerSequence()) + assert.Equal(b, lcms[batchSize-1].LedgerSequence(), ledgers[batchSize-1].LedgerSequence()) + } +} + func NewTestDB(tb testing.TB) *DB { tmp := tb.TempDir() dbPath := path.Join(tmp, "db.sqlite") @@ -222,3 +221,25 @@ func NewTestDB(tb testing.TB) *DB { }) return db } + +func setupBenchmarkingDB(b *testing.B) (*DB, []xdr.LedgerCloseMeta) { + testDB := NewTestDB(b) + logger := log.DefaultLogger + writer := NewReadWriter(logger, testDB, interfaces.MakeNoOpDeamon(), + 100, 1_000_000, passphrase) + write, err := writer.NewTx(context.TODO()) + require.NoError(b, err) + + lcms := make([]xdr.LedgerCloseMeta, 0, 100_000) + for i := range cap(lcms) { + lcms = append(lcms, txMeta(uint32(1234+i), i%2 == 0)) + } + + ledgerW, txW := write.LedgerWriter(), write.TransactionWriter() + for _, lcm := range lcms { + require.NoError(b, ledgerW.InsertLedger(lcm)) + require.NoError(b, txW.InsertTransactions(lcm)) + } + require.NoError(b, write.Commit(lcms[len(lcms)-1])) + return testDB, lcms +} diff --git a/cmd/soroban-rpc/internal/db/mocks.go b/cmd/soroban-rpc/internal/db/mocks.go index fee538d9..4868ced4 100644 --- a/cmd/soroban-rpc/internal/db/mocks.go +++ b/cmd/soroban-rpc/internal/db/mocks.go @@ -109,6 +109,10 @@ func (m *MockLedgerReader) GetLedgerRange(_ context.Context) (ledgerbucketwindow return m.txn.ledgerRange, nil } +func (m *MockLedgerReader) NewTx(_ context.Context) (LedgerReaderTx, error) { + return nil, errors.New("mock NewTx error") +} + var ( _ TransactionReader = &MockTransactionHandler{} _ TransactionWriter = &MockTransactionHandler{} diff --git a/cmd/soroban-rpc/internal/integrationtest/get_ledgers_test.go b/cmd/soroban-rpc/internal/integrationtest/get_ledgers_test.go new file mode 100644 index 00000000..d8d0aa2b --- /dev/null +++ b/cmd/soroban-rpc/internal/integrationtest/get_ledgers_test.go @@ -0,0 +1,71 @@ +package integrationtest + +import ( + "context" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "github.com/stellar/soroban-rpc/cmd/soroban-rpc/internal/integrationtest/infrastructure" + "github.com/stellar/soroban-rpc/cmd/soroban-rpc/internal/methods" +) + +func TestGetLedgers(t *testing.T) { + test := infrastructure.NewTest(t, nil) + client := test.GetRPCLient() + + // Get all ledgers + request := methods.GetLedgersRequest{ + StartLedger: 8, + Pagination: &methods.LedgerPaginationOptions{ + Limit: 3, + }, + } + var result methods.GetLedgersResponse + err := client.CallResult(context.Background(), "getLedgers", request, &result) + require.NoError(t, err) + assert.Len(t, result.Ledgers, 3) + prevLedgers := result.Ledgers + + // Get ledgers using previous result's cursor + request = methods.GetLedgersRequest{ + Pagination: &methods.LedgerPaginationOptions{ + Cursor: result.Cursor, + Limit: 2, + }, + } + err = client.CallResult(context.Background(), "getLedgers", request, &result) + require.NoError(t, err) + assert.Len(t, result.Ledgers, 2) + assert.Equal(t, prevLedgers[len(prevLedgers)-1].Sequence+1, result.Ledgers[0].Sequence) + + // Test with JSON format + request = methods.GetLedgersRequest{ + StartLedger: 8, + Pagination: &methods.LedgerPaginationOptions{ + Limit: 1, + }, + Format: methods.FormatJSON, + } + err = client.CallResult(context.Background(), "getLedgers", request, &result) + require.NoError(t, err) + assert.NotEmpty(t, result.Ledgers[0].LedgerHeaderJSON) + assert.NotEmpty(t, result.Ledgers[0].LedgerMetadataJSON) + + // Test invalid requests + invalidRequests := []methods.GetLedgersRequest{ + {StartLedger: result.OldestLedger - 1}, + {StartLedger: result.LatestLedger + 1}, + { + Pagination: &methods.LedgerPaginationOptions{ + Cursor: "invalid", + }, + }, + } + + for _, req := range invalidRequests { + err = client.CallResult(context.Background(), "getLedgers", req, &result) + assert.Error(t, err) + } +} diff --git a/cmd/soroban-rpc/internal/jsonrpc.go b/cmd/soroban-rpc/internal/jsonrpc.go index 66ca7bd1..c92d0070 100644 --- a/cmd/soroban-rpc/internal/jsonrpc.go +++ b/cmd/soroban-rpc/internal/jsonrpc.go @@ -199,6 +199,14 @@ func NewJSONRPCHandler(cfg *config.Config, params HandlerParams) Handler { queueLimit: cfg.RequestBacklogGetLatestLedgerQueueLimit, requestDurationLimit: cfg.MaxGetLatestLedgerExecutionDuration, }, + { + methodName: "getLedgers", + underlyingHandler: methods.NewGetLedgersHandler(params.LedgerReader, + cfg.MaxLedgersLimit, cfg.DefaultLedgersLimit), + longName: "get_ledgers", + queueLimit: cfg.RequestBacklogGetLedgersQueueLimit, + requestDurationLimit: cfg.MaxGetLedgersExecutionDuration, + }, { methodName: "getLedgerEntries", underlyingHandler: methods.NewGetLedgerEntriesHandler(params.Logger, params.LedgerEntryReader), diff --git a/cmd/soroban-rpc/internal/methods/get_latest_ledger_test.go b/cmd/soroban-rpc/internal/methods/get_latest_ledger_test.go index 0fcb4494..251b3e34 100644 --- a/cmd/soroban-rpc/internal/methods/get_latest_ledger_test.go +++ b/cmd/soroban-rpc/internal/methods/get_latest_ledger_test.go @@ -2,6 +2,7 @@ package methods import ( "context" + "errors" "testing" "github.com/creachadair/jrpc2" @@ -30,6 +31,10 @@ func (ledgerReader *ConstantLedgerReader) GetLedgerRange(_ context.Context) (led return ledgerbucketwindow.LedgerRange{}, nil } +func (ledgerReader *ConstantLedgerReader) NewTx(_ context.Context) (db.LedgerReaderTx, error) { + return nil, errors.New("mock NewTx error") +} + func (entryReader *ConstantLedgerEntryReader) GetLatestLedgerSequence(_ context.Context) (uint32, error) { return expectedLatestLedgerSequence, nil } diff --git a/cmd/soroban-rpc/internal/methods/get_ledgers.go b/cmd/soroban-rpc/internal/methods/get_ledgers.go new file mode 100644 index 00000000..0d63e85e --- /dev/null +++ b/cmd/soroban-rpc/internal/methods/get_ledgers.go @@ -0,0 +1,252 @@ +package methods + +import ( + "context" + "encoding/base64" + "encoding/json" + "errors" + "fmt" + "strconv" + + "github.com/creachadair/jrpc2" + + "github.com/stellar/go/xdr" + + "github.com/stellar/soroban-rpc/cmd/soroban-rpc/internal/db" + "github.com/stellar/soroban-rpc/cmd/soroban-rpc/internal/ledgerbucketwindow" +) + +type LedgerPaginationOptions struct { + Cursor string `json:"cursor,omitempty"` + Limit uint `json:"limit,omitempty"` +} + +// isStartLedgerWithinBounds checks whether the request start ledger/cursor is within the max/min ledger +// for the current RPC instance. +func isStartLedgerWithinBounds(startLedger uint32, ledgerRange ledgerbucketwindow.LedgerRange) bool { + return startLedger >= ledgerRange.FirstLedger.Sequence && startLedger <= ledgerRange.LastLedger.Sequence +} + +// GetLedgersRequest represents the request parameters for fetching ledgers. +type GetLedgersRequest struct { + StartLedger uint32 `json:"startLedger"` + Pagination *LedgerPaginationOptions `json:"pagination,omitempty"` + Format string `json:"xdrFormat,omitempty"` +} + +// validate checks the validity of the request parameters. +func (req *GetLedgersRequest) validate(maxLimit uint, ledgerRange ledgerbucketwindow.LedgerRange) error { + switch { + case req.Pagination != nil: + switch { + case req.Pagination.Cursor != "" && req.StartLedger != 0: + return errors.New("startLedger and cursor cannot both be set") + case req.Pagination.Limit > maxLimit: + return fmt.Errorf("limit must not exceed %d", maxLimit) + } + case req.StartLedger != 0 && !isStartLedgerWithinBounds(req.StartLedger, ledgerRange): + return fmt.Errorf( + "start ledger must be between the oldest ledger: %d and the latest ledger: %d for this rpc instance", + ledgerRange.FirstLedger.Sequence, + ledgerRange.LastLedger.Sequence, + ) + } + + return IsValidFormat(req.Format) +} + +// LedgerInfo represents a single ledger in the response. +type LedgerInfo struct { + Hash string `json:"hash"` + Sequence uint32 `json:"sequence"` + LedgerCloseTime int64 `json:"ledgerCloseTime,string"` + + LedgerHeader string `json:"headerXdr"` + LedgerHeaderJSON json.RawMessage `json:"headerJson,omitempty"` + + LedgerMetadata string `json:"metadataXdr"` + LedgerMetadataJSON json.RawMessage `json:"metadataJson,omitempty"` +} + +// GetLedgersResponse encapsulates the response structure for getLedgers queries. +type GetLedgersResponse struct { + Ledgers []LedgerInfo `json:"ledgers"` + LatestLedger uint32 `json:"latestLedger"` + LatestLedgerCloseTime int64 `json:"latestLedgerCloseTime"` + OldestLedger uint32 `json:"oldestLedger"` + OldestLedgerCloseTime int64 `json:"oldestLedgerCloseTime"` + Cursor string `json:"cursor"` +} + +type ledgersHandler struct { + ledgerReader db.LedgerReader + maxLimit uint + defaultLimit uint +} + +// NewGetLedgersHandler returns a jrpc2.Handler for the getLedgers method. +func NewGetLedgersHandler(ledgerReader db.LedgerReader, maxLimit, defaultLimit uint) jrpc2.Handler { + return NewHandler((&ledgersHandler{ + ledgerReader: ledgerReader, + maxLimit: maxLimit, + defaultLimit: defaultLimit, + }).getLedgers) +} + +// getLedgers fetch ledgers and relevant metadata from DB. +func (h ledgersHandler) getLedgers(ctx context.Context, request GetLedgersRequest) (GetLedgersResponse, error) { + readTx, err := h.ledgerReader.NewTx(ctx) + if err != nil { + return GetLedgersResponse{}, &jrpc2.Error{ + Code: jrpc2.InternalError, + Message: err.Error(), + } + } + defer func() { + _ = readTx.Done() + }() + + ledgerRange, err := readTx.GetLedgerRange(ctx) + if err != nil { + return GetLedgersResponse{}, &jrpc2.Error{ + Code: jrpc2.InternalError, + Message: err.Error(), + } + } + + if err := request.validate(h.maxLimit, ledgerRange); err != nil { + return GetLedgersResponse{}, &jrpc2.Error{ + Code: jrpc2.InvalidRequest, + Message: err.Error(), + } + } + + start, limit, err := h.initializePagination(request, ledgerRange) + if err != nil { + return GetLedgersResponse{}, &jrpc2.Error{ + Code: jrpc2.InvalidParams, + Message: err.Error(), + } + } + + ledgers, err := h.fetchLedgers(ctx, start, limit, request.Format, readTx) + if err != nil { + return GetLedgersResponse{}, err + } + cursor := strconv.Itoa(int(ledgers[len(ledgers)-1].Sequence)) + + return GetLedgersResponse{ + Ledgers: ledgers, + LatestLedger: ledgerRange.LastLedger.Sequence, + LatestLedgerCloseTime: ledgerRange.LastLedger.CloseTime, + OldestLedger: ledgerRange.FirstLedger.Sequence, + OldestLedgerCloseTime: ledgerRange.FirstLedger.CloseTime, + Cursor: cursor, + }, nil +} + +// initializePagination parses the request pagination details and initializes the cursor. +func (h ledgersHandler) initializePagination(request GetLedgersRequest, + ledgerRange ledgerbucketwindow.LedgerRange, +) (uint32, uint, error) { + if request.Pagination == nil { + return request.StartLedger, h.defaultLimit, nil + } + + start := request.StartLedger + var err error + if request.Pagination.Cursor != "" { + start, err = h.parseCursor(request.Pagination.Cursor, ledgerRange) + if err != nil { + return 0, 0, err + } + } + + limit := request.Pagination.Limit + if limit <= 0 { + limit = h.defaultLimit + } + return start, limit, nil +} + +func (h ledgersHandler) parseCursor(cursor string, ledgerRange ledgerbucketwindow.LedgerRange) (uint32, error) { + cursorInt, err := strconv.ParseUint(cursor, 10, 32) + if err != nil { + return 0, err + } + + start := uint32(cursorInt) + 1 + if !isStartLedgerWithinBounds(start, ledgerRange) { + return 0, fmt.Errorf( + "cursor must be between the oldest ledger: %d and the latest ledger: %d for this rpc instance", + ledgerRange.FirstLedger.Sequence, + ledgerRange.LastLedger.Sequence, + ) + } + + return start, nil +} + +// fetchLedgers fetches ledgers from the DB for the range [start, start+limit-1] +func (h ledgersHandler) fetchLedgers(ctx context.Context, start uint32, + limit uint, format string, readTx db.LedgerReaderTx, +) ([]LedgerInfo, error) { + ledgers, err := readTx.BatchGetLedgers(ctx, start, limit) + if err != nil { + return nil, &jrpc2.Error{ + Code: jrpc2.InternalError, + Message: fmt.Sprintf("error fetching ledgers from db: %v", err), + } + } + + result := make([]LedgerInfo, 0, limit) + for _, ledger := range ledgers { + if uint(len(result)) >= limit { + break + } + + ledgerInfo, err := h.parseLedgerInfo(ledger, format) + if err != nil { + return nil, &jrpc2.Error{ + Code: jrpc2.InternalError, + Message: fmt.Sprintf("error processing ledger %d: %v", ledger.LedgerSequence(), err), + } + } + result = append(result, ledgerInfo) + } + + return result, nil +} + +// parseLedgerInfo extracts and formats the ledger metadata and header information. +func (h ledgersHandler) parseLedgerInfo(ledger xdr.LedgerCloseMeta, format string) (LedgerInfo, error) { + ledgerInfo := LedgerInfo{ + Hash: ledger.LedgerHash().HexString(), + Sequence: ledger.LedgerSequence(), + LedgerCloseTime: ledger.LedgerCloseTime(), + } + + // Format the data according to the requested format (JSON or XDR) + switch format { + case FormatJSON: + var convErr error + ledgerInfo.LedgerMetadataJSON, ledgerInfo.LedgerHeaderJSON, convErr = ledgerToJSON(&ledger) + if convErr != nil { + return ledgerInfo, convErr + } + default: + closeMetaB, err := ledger.MarshalBinary() + if err != nil { + return LedgerInfo{}, fmt.Errorf("error marshaling ledger close meta: %w", err) + } + + headerB, err := ledger.LedgerHeaderHistoryEntry().MarshalBinary() + if err != nil { + return LedgerInfo{}, fmt.Errorf("error marshaling ledger header: %w", err) + } + + ledgerInfo.LedgerMetadata = base64.StdEncoding.EncodeToString(closeMetaB) + ledgerInfo.LedgerHeader = base64.StdEncoding.EncodeToString(headerB) + } + return ledgerInfo, nil +} diff --git a/cmd/soroban-rpc/internal/methods/get_ledgers_test.go b/cmd/soroban-rpc/internal/methods/get_ledgers_test.go new file mode 100644 index 00000000..2d0ae5a4 --- /dev/null +++ b/cmd/soroban-rpc/internal/methods/get_ledgers_test.go @@ -0,0 +1,286 @@ +package methods + +import ( + "context" + "encoding/json" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "github.com/stellar/go/support/log" + "github.com/stellar/go/xdr" + + "github.com/stellar/soroban-rpc/cmd/soroban-rpc/internal/daemon/interfaces" + "github.com/stellar/soroban-rpc/cmd/soroban-rpc/internal/db" +) + +var expectedLedgerInfo = LedgerInfo{ + Hash: "0000000000000000000000000000000000000000000000000000000000000000", + Sequence: 1, + LedgerCloseTime: 125, + LedgerHeader: "AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAB9AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAQAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA", //nolint:lll + LedgerMetadata: "AAAAAQAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAH0AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAABAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAABAQAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAABAAAAAAAAAAEAAAAAAAAAAAAAAAEAAAACAAABAIAAAAAAAAAAPww0v5OtDZlx0EzMkPcFURyDiq2XNKSi+w16A/x/6JoAAAABAAAAAP///50AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAGw0LNdyu0BUtYvu6oo7T+kmRyH5+FpqPyiaHsX7ibKLQAAAAAAAABkAAAAAAAAAAAAAAAAAAAAAAAAAAMAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA==", //nolint:lll +} + +func setupTestDB(t *testing.T, numLedgers int) *db.DB { + testDB := NewTestDB(t) + daemon := interfaces.MakeNoOpDeamon() + for sequence := 1; sequence <= numLedgers; sequence++ { + ledgerCloseMeta := txMeta(uint32(sequence)-100, true) + tx, err := db.NewReadWriter(log.DefaultLogger, testDB, daemon, 150, 100, passphrase).NewTx(context.Background()) + require.NoError(t, err) + require.NoError(t, tx.LedgerWriter().InsertLedger(ledgerCloseMeta)) + require.NoError(t, tx.Commit(ledgerCloseMeta)) + } + return testDB +} + +func TestGetLedgers_DefaultLimit(t *testing.T) { + testDB := setupTestDB(t, 50) + handler := ledgersHandler{ + ledgerReader: db.NewLedgerReader(testDB), + maxLimit: 100, + defaultLimit: 5, + } + + request := GetLedgersRequest{ + StartLedger: 1, + } + + response, err := handler.getLedgers(context.TODO(), request) + require.NoError(t, err) + + assert.Equal(t, uint32(50), response.LatestLedger) + assert.Equal(t, ledgerCloseTime(50), response.LatestLedgerCloseTime) + assert.Equal(t, "5", response.Cursor) + assert.Len(t, response.Ledgers, 5) + assert.Equal(t, uint32(1), response.Ledgers[0].Sequence) + assert.Equal(t, uint32(5), response.Ledgers[4].Sequence) + + // assert a single ledger info structure for sanity purposes + assert.Equal(t, expectedLedgerInfo, response.Ledgers[0]) +} + +func TestGetLedgers_CustomLimit(t *testing.T) { + testDB := setupTestDB(t, 40) + handler := ledgersHandler{ + ledgerReader: db.NewLedgerReader(testDB), + maxLimit: 100, + defaultLimit: 5, + } + + request := GetLedgersRequest{ + StartLedger: 1, + Pagination: &LedgerPaginationOptions{ + Limit: 50, + }, + } + + response, err := handler.getLedgers(context.TODO(), request) + require.NoError(t, err) + + assert.Equal(t, uint32(40), response.LatestLedger) + assert.Equal(t, "40", response.Cursor) + assert.Len(t, response.Ledgers, 40) + assert.Equal(t, uint32(1), response.Ledgers[0].Sequence) + assert.Equal(t, uint32(40), response.Ledgers[39].Sequence) +} + +func TestGetLedgers_WithCursor(t *testing.T) { + testDB := setupTestDB(t, 10) + handler := ledgersHandler{ + ledgerReader: db.NewLedgerReader(testDB), + maxLimit: 100, + defaultLimit: 5, + } + + request := GetLedgersRequest{ + Pagination: &LedgerPaginationOptions{ + Cursor: "5", + Limit: 3, + }, + } + + response, err := handler.getLedgers(context.TODO(), request) + require.NoError(t, err) + + assert.Equal(t, uint32(10), response.LatestLedger) + assert.Equal(t, "8", response.Cursor) + assert.Len(t, response.Ledgers, 3) + assert.Equal(t, uint32(6), response.Ledgers[0].Sequence) + assert.Equal(t, uint32(8), response.Ledgers[2].Sequence) +} + +func TestGetLedgers_InvalidStartLedger(t *testing.T) { + testDB := setupTestDB(t, 10) + handler := ledgersHandler{ + ledgerReader: db.NewLedgerReader(testDB), + maxLimit: 100, + defaultLimit: 5, + } + + request := GetLedgersRequest{ + StartLedger: 12, + } + + _, err := handler.getLedgers(context.TODO(), request) + require.Error(t, err) + assert.Contains(t, err.Error(), "start ledger must be between") +} + +func TestGetLedgers_LimitExceedsMaxLimit(t *testing.T) { + testDB := setupTestDB(t, 10) + handler := ledgersHandler{ + ledgerReader: db.NewLedgerReader(testDB), + maxLimit: 100, + defaultLimit: 5, + } + + request := GetLedgersRequest{ + StartLedger: 1, + Pagination: &LedgerPaginationOptions{ + Limit: 101, + }, + } + + _, err := handler.getLedgers(context.TODO(), request) + require.Error(t, err) + assert.Contains(t, err.Error(), "limit must not exceed 100") +} + +func TestGetLedgers_InvalidCursor(t *testing.T) { + testDB := setupTestDB(t, 10) + handler := ledgersHandler{ + ledgerReader: db.NewLedgerReader(testDB), + maxLimit: 100, + defaultLimit: 5, + } + + request := GetLedgersRequest{ + Pagination: &LedgerPaginationOptions{ + Cursor: "invalid", + }, + } + + _, err := handler.getLedgers(context.TODO(), request) + require.Error(t, err) + assert.Contains(t, err.Error(), "invalid syntax") +} + +func TestGetLedgers_JSONFormat(t *testing.T) { + testDB := setupTestDB(t, 10) + handler := ledgersHandler{ + ledgerReader: db.NewLedgerReader(testDB), + maxLimit: 100, + defaultLimit: 5, + } + + request := GetLedgersRequest{ + StartLedger: 1, + Format: FormatJSON, + } + + response, err := handler.getLedgers(context.TODO(), request) + require.NoError(t, err) + + assert.NotEmpty(t, response.Ledgers) + ledger := response.Ledgers[0] + + assert.NotEmpty(t, ledger.LedgerHeaderJSON) + assert.Empty(t, ledger.LedgerHeader) + assert.NotEmpty(t, ledger.LedgerMetadataJSON) + assert.Empty(t, ledger.LedgerMetadata) + + var headerJSON map[string]interface{} + err = json.Unmarshal(ledger.LedgerHeaderJSON, &headerJSON) + require.NoError(t, err) + assert.NotEmpty(t, headerJSON) + + var metaJSON map[string]interface{} + err = json.Unmarshal(ledger.LedgerMetadataJSON, &metaJSON) + require.NoError(t, err) + assert.NotEmpty(t, metaJSON) +} + +func TestGetLedgers_NoLedgers(t *testing.T) { + testDB := setupTestDB(t, 0) + handler := ledgersHandler{ + ledgerReader: db.NewLedgerReader(testDB), + maxLimit: 100, + defaultLimit: 5, + } + + request := GetLedgersRequest{ + StartLedger: 1, + } + + _, err := handler.getLedgers(context.TODO(), request) + require.Error(t, err) + assert.Contains(t, err.Error(), "[-32603] DB is empty") +} + +func TestGetLedgers_CursorGreaterThanLatestLedger(t *testing.T) { + testDB := setupTestDB(t, 10) + handler := ledgersHandler{ + ledgerReader: db.NewLedgerReader(testDB), + maxLimit: 100, + defaultLimit: 5, + } + + request := GetLedgersRequest{ + Pagination: &LedgerPaginationOptions{ + Cursor: "15", + }, + } + + _, err := handler.getLedgers(context.TODO(), request) + require.Error(t, err) + assert.Contains(t, err.Error(), "cursor must be between") +} + +func BenchmarkGetLedgers(b *testing.B) { + testDB := setupBenchmarkingDB(b) + handler := ledgersHandler{ + ledgerReader: db.NewLedgerReader(testDB), + maxLimit: 200, + defaultLimit: 5, + } + + request := GetLedgersRequest{ + StartLedger: 1334, + Pagination: &LedgerPaginationOptions{ + Limit: 200, // using the current maximum request limit for getLedgers endpoint + }, + } + + b.ResetTimer() + for range b.N { + response, err := handler.getLedgers(context.TODO(), request) + require.NoError(b, err) + assert.Equal(b, uint32(1334), response.Ledgers[0].Sequence) + assert.Equal(b, uint32(1533), response.Ledgers[199].Sequence) + } +} + +func setupBenchmarkingDB(b *testing.B) *db.DB { + testDB := NewTestDB(b) + logger := log.DefaultLogger + writer := db.NewReadWriter(logger, testDB, interfaces.MakeNoOpDeamon(), + 100, 1_000_000, passphrase) + write, err := writer.NewTx(context.TODO()) + require.NoError(b, err) + + lcms := make([]xdr.LedgerCloseMeta, 0, 100_000) + for i := range cap(lcms) { + lcms = append(lcms, txMeta(uint32(1234+i), i%2 == 0)) + } + + ledgerW, txW := write.LedgerWriter(), write.TransactionWriter() + for _, lcm := range lcms { + require.NoError(b, ledgerW.InsertLedger(lcm)) + require.NoError(b, txW.InsertTransactions(lcm)) + } + require.NoError(b, write.Commit(lcms[len(lcms)-1])) + return testDB +} diff --git a/cmd/soroban-rpc/internal/methods/json.go b/cmd/soroban-rpc/internal/methods/json.go index 739a673a..7f271a49 100644 --- a/cmd/soroban-rpc/internal/methods/json.go +++ b/cmd/soroban-rpc/internal/methods/json.go @@ -58,3 +58,20 @@ func transactionToJSON(tx db.Transaction) ( return result, envelope, resultMeta, nil } + +func ledgerToJSON(meta *xdr.LedgerCloseMeta) ([]byte, []byte, error) { + var err error + var closeMetaJSON, headerJSON []byte + + closeMetaJSON, err = xdr2json.ConvertInterface(*meta) + if err != nil { + return nil, nil, err + } + + headerJSON, err = xdr2json.ConvertInterface(meta.LedgerHeaderHistoryEntry()) + if err != nil { + return nil, nil, err + } + + return closeMetaJSON, headerJSON, nil +}