From 6cf77caee26a48a49599dbb9d8b6ed6fea7c0f60 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Milo=C5=A1=20=C5=BDivkovi=C4=87?= Date: Mon, 3 Feb 2025 14:59:31 +0100 Subject: [PATCH] feat: skip failed txs (#56) * Skip failed txs * Fix linting issue --- backup/backup.go | 35 ++++++++++++++++++++++++++++++++--- backup/backup_test.go | 27 +++++++++++++++++++++++++++ backup/client/client.go | 4 ++++ backup/client/rpc/rpc.go | 20 +++++++++++++++++++- backup/config.go | 10 ++++++---- backup/mock_test.go | 11 +++++++++++ backup/options.go | 7 +++++++ cmd/backup.go | 20 +++++++++++++++----- 8 files changed, 121 insertions(+), 13 deletions(-) diff --git a/backup/backup.go b/backup/backup.go index 466fb35..4b60382 100644 --- a/backup/backup.go +++ b/backup/backup.go @@ -23,6 +23,7 @@ type Service struct { batchSize uint watchInterval time.Duration // interval for the watch routine + skipFailedTxs bool } // NewService creates a new backup service @@ -105,9 +106,9 @@ func (s *Service) ExecuteBackup(ctx context.Context, cfg Config) error { ) // Fetch current batch - blocks, txErr := s.client.GetBlocks(ctx, batchStart, batchStop) - if txErr != nil { - return fmt.Errorf("unable to fetch block transactions, %w", txErr) + blocks, err := s.client.GetBlocks(ctx, batchStart, batchStop) + if err != nil { + return fmt.Errorf("unable to fetch blocks, %w", err) } // Keep track of the number of fetched blocks & those containing transactions @@ -122,7 +123,35 @@ func (s *Service) ExecuteBackup(ctx context.Context, cfg Config) error { // Iterate over the list of blocks containing transactions for _, block := range blocks { + // Fetch current batch tx results, if any + txResults, err := s.client.GetTxResults(block.Height) + if err != nil { + return fmt.Errorf("unable to fetch tx results, %w", err) + } + + // Sanity check + if len(txResults) != len(block.Txs) { + return fmt.Errorf( + "invalid txs results fetched %d, expected %d", + len(txResults), + len(block.Txs), + ) + } + for i, tx := range block.Txs { + txResult := txResults[i] + + if !txResult.IsOK() && s.skipFailedTxs { + // Skip saving failed transaction + s.logger.Debug( + "Skipping failed tx", + "height", block.Height, + "index", i, + ) + + continue + } + // Write the tx data to the file txData := &gnoland.TxWithMetadata{ Tx: tx, diff --git a/backup/backup_test.go b/backup/backup_test.go index 533215c..5c53805 100644 --- a/backup/backup_test.go +++ b/backup/backup_test.go @@ -11,6 +11,7 @@ import ( "github.com/gnolang/gno/gno.land/pkg/gnoland" "github.com/gnolang/gno/tm2/pkg/amino" + abci "github.com/gnolang/gno/tm2/pkg/bft/abci/types" "github.com/gnolang/gno/tm2/pkg/std" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -173,6 +174,19 @@ func TestBackup_ExecuteBackup_FixedRange(t *testing.T) { return generateBlocks(t, from, to, tCase.txsPerBlock), nil }, + getTxResultsFn: func(_ uint64) ([]*abci.ResponseDeliverTx, error) { + txs := make([]*abci.ResponseDeliverTx, 0, tCase.txsPerBlock) + + for range tCase.txsPerBlock { + txs = append(txs, &abci.ResponseDeliverTx{ + ResponseBase: abci.ResponseBase{ + Error: nil, + }, + }) + } + + return txs, nil + }, } ) @@ -286,6 +300,19 @@ func TestBackup_ExecuteBackup_Watch(t *testing.T) { return generateBlocks(t, from, to, tCase.txsPerBlock), nil }, + getTxResultsFn: func(_ uint64) ([]*abci.ResponseDeliverTx, error) { + txs := make([]*abci.ResponseDeliverTx, 0, tCase.txsPerBlock) + + for range tCase.txsPerBlock { + txs = append(txs, &abci.ResponseDeliverTx{ + ResponseBase: abci.ResponseBase{ + Error: nil, + }, + }) + } + + return txs, nil + }, } ) diff --git a/backup/client/client.go b/backup/client/client.go index a5355c8..dc9fce4 100644 --- a/backup/client/client.go +++ b/backup/client/client.go @@ -1,6 +1,7 @@ package client import ( + abci "github.com/gnolang/gno/tm2/pkg/bft/abci/types" "github.com/gnolang/gno/tm2/pkg/std" "golang.org/x/net/context" ) @@ -14,6 +15,9 @@ type Client interface { // timestamp in milliseconds - in the requested range only if they contain // transactions GetBlocks(ctx context.Context, from, to uint64) ([]*Block, error) + + // GetTxResults returns the block transaction results (if any) + GetTxResults(block uint64) ([]*abci.ResponseDeliverTx, error) } type Block struct { diff --git a/backup/client/rpc/rpc.go b/backup/client/rpc/rpc.go index e6d3394..8526f3f 100644 --- a/backup/client/rpc/rpc.go +++ b/backup/client/rpc/rpc.go @@ -8,6 +8,7 @@ import ( _ "github.com/gnolang/gno/gno.land/pkg/sdk/vm" "github.com/gnolang/gno/tm2/pkg/amino" + abci "github.com/gnolang/gno/tm2/pkg/bft/abci/types" rpcClient "github.com/gnolang/gno/tm2/pkg/bft/rpc/client" ctypes "github.com/gnolang/gno/tm2/pkg/bft/rpc/core/types" "github.com/gnolang/gno/tm2/pkg/std" @@ -121,7 +122,7 @@ func (c *Client) GetBlocks(ctx context.Context, from, to uint64) ([]*client.Bloc // Add block including transactions, timestamp and block height to slice blocks = append(blocks, &client.Block{ - Timestamp: blockRes.Block.Time.UnixMilli(), + Timestamp: blockRes.Block.Time.Unix(), Height: uint64(blockRes.Block.Height), Txs: txs, }) @@ -130,3 +131,20 @@ func (c *Client) GetBlocks(ctx context.Context, from, to uint64) ([]*client.Bloc return blocks, nil } + +func (c *Client) GetTxResults(block uint64) ([]*abci.ResponseDeliverTx, error) { + block64 := int64(block) + + results, err := c.client.BlockResults(&block64) + if err != nil { + return nil, fmt.Errorf("unable to fetch block results, %w", err) + } + + txResults := make([]*abci.ResponseDeliverTx, 0) + + for txIndex, tx := range results.Results.DeliverTxs { + txResults[txIndex] = &tx + } + + return txResults, nil +} diff --git a/backup/config.go b/backup/config.go index e13b01a..05baf31 100644 --- a/backup/config.go +++ b/backup/config.go @@ -14,15 +14,17 @@ type Config struct { ToBlock *uint64 // the right bound for the block range; latest if not specified FromBlock uint64 // the left bound for the block range - Watch bool // flag indicating if incoming tx data should be backed up + Watch bool // flag indicating if incoming tx data should be backed up + SkipFailedTx bool // flag indicating if failed txs should be ignored } // DefaultConfig returns the default backup configuration func DefaultConfig() Config { return Config{ - ToBlock: nil, // to latest block by default - FromBlock: 1, // from genesis + 1 by default - Watch: false, // no tracking by default + ToBlock: nil, // to latest block by default + FromBlock: 1, // from genesis + 1 by default + Watch: false, // no tracking by default + SkipFailedTx: false, // include all txs } } diff --git a/backup/mock_test.go b/backup/mock_test.go index db95c66..6191880 100644 --- a/backup/mock_test.go +++ b/backup/mock_test.go @@ -3,17 +3,20 @@ package backup import ( "context" + abci "github.com/gnolang/gno/tm2/pkg/bft/abci/types" "github.com/gnolang/tx-archive/backup/client" ) type ( getLatestBlockNumberDelegate func() (uint64, error) getBlocksDelegate func(context.Context, uint64, uint64) ([]*client.Block, error) + getTxResultsDelegate func(uint64) ([]*abci.ResponseDeliverTx, error) ) type mockClient struct { getLatestBlockNumberFn getLatestBlockNumberDelegate getBlocksFn getBlocksDelegate + getTxResultsFn getTxResultsDelegate } func (m *mockClient) GetLatestBlockNumber() (uint64, error) { @@ -31,3 +34,11 @@ func (m *mockClient) GetBlocks(ctx context.Context, from, to uint64) ([]*client. return nil, nil } + +func (m *mockClient) GetTxResults(block uint64) ([]*abci.ResponseDeliverTx, error) { + if m.getTxResultsFn != nil { + return m.getTxResultsFn(block) + } + + return nil, nil +} diff --git a/backup/options.go b/backup/options.go index cad6024..afa307a 100644 --- a/backup/options.go +++ b/backup/options.go @@ -17,3 +17,10 @@ func WithBatchSize(size uint) Option { s.batchSize = size } } + +// WithSkipFailedTxs specifies if failed txs should be backed up +func WithSkipFailedTxs(skip bool) Option { + return func(s *Service) { + s.skipFailedTxs = skip + } +} diff --git a/cmd/backup.go b/cmd/backup.go index 0b88b37..4558331 100644 --- a/cmd/backup.go +++ b/cmd/backup.go @@ -41,11 +41,12 @@ type backupCfg struct { fromBlock uint64 batchSize uint - ws bool - overwrite bool - legacy bool - watch bool - verbose bool + ws bool + overwrite bool + legacy bool + watch bool + verbose bool + skipFailedTxs bool } // newBackupCmd creates the backup command @@ -135,6 +136,13 @@ func (c *backupCfg) registerFlags(fs *flag.FlagSet) { false, "flag indicating if the log verbosity should be set to debug level", ) + + fs.BoolVar( + &c.skipFailedTxs, + "skip-failed-txs", + false, + "flag indicating if failed txs should be skipped", + ) } // exec executes the backup command @@ -159,6 +167,7 @@ func (c *backupCfg) exec(ctx context.Context, _ []string) error { cfg := backup.DefaultConfig() cfg.FromBlock = c.fromBlock cfg.Watch = c.watch + cfg.SkipFailedTx = c.skipFailedTxs if c.toBlock >= 0 { to64 := uint64(c.toBlock) @@ -240,6 +249,7 @@ func (c *backupCfg) exec(ctx context.Context, _ []string) error { w, backup.WithLogger(logger), backup.WithBatchSize(c.batchSize), + backup.WithSkipFailedTxs(c.skipFailedTxs), ) // Run the backup service