Skip to content

Commit

Permalink
feat: skip failed txs (#56)
Browse files Browse the repository at this point in the history
* Skip failed txs

* Fix linting issue
  • Loading branch information
zivkovicmilos authored Feb 3, 2025
1 parent 584041c commit 6cf77ca
Show file tree
Hide file tree
Showing 8 changed files with 121 additions and 13 deletions.
35 changes: 32 additions & 3 deletions backup/backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ type Service struct {

batchSize uint
watchInterval time.Duration // interval for the watch routine
skipFailedTxs bool
}

// NewService creates a new backup service
Expand Down Expand Up @@ -105,9 +106,9 @@ func (s *Service) ExecuteBackup(ctx context.Context, cfg Config) error {
)

// Fetch current batch
blocks, txErr := s.client.GetBlocks(ctx, batchStart, batchStop)
if txErr != nil {
return fmt.Errorf("unable to fetch block transactions, %w", txErr)
blocks, err := s.client.GetBlocks(ctx, batchStart, batchStop)
if err != nil {
return fmt.Errorf("unable to fetch blocks, %w", err)
}

// Keep track of the number of fetched blocks & those containing transactions
Expand All @@ -122,7 +123,35 @@ func (s *Service) ExecuteBackup(ctx context.Context, cfg Config) error {

// Iterate over the list of blocks containing transactions
for _, block := range blocks {
// Fetch current batch tx results, if any
txResults, err := s.client.GetTxResults(block.Height)
if err != nil {
return fmt.Errorf("unable to fetch tx results, %w", err)
}

// Sanity check
if len(txResults) != len(block.Txs) {
return fmt.Errorf(
"invalid txs results fetched %d, expected %d",
len(txResults),
len(block.Txs),
)
}

for i, tx := range block.Txs {
txResult := txResults[i]

if !txResult.IsOK() && s.skipFailedTxs {
// Skip saving failed transaction
s.logger.Debug(
"Skipping failed tx",
"height", block.Height,
"index", i,
)

continue
}

// Write the tx data to the file
txData := &gnoland.TxWithMetadata{
Tx: tx,
Expand Down
27 changes: 27 additions & 0 deletions backup/backup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (

"github.com/gnolang/gno/gno.land/pkg/gnoland"
"github.com/gnolang/gno/tm2/pkg/amino"
abci "github.com/gnolang/gno/tm2/pkg/bft/abci/types"
"github.com/gnolang/gno/tm2/pkg/std"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -173,6 +174,19 @@ func TestBackup_ExecuteBackup_FixedRange(t *testing.T) {

return generateBlocks(t, from, to, tCase.txsPerBlock), nil
},
getTxResultsFn: func(_ uint64) ([]*abci.ResponseDeliverTx, error) {
txs := make([]*abci.ResponseDeliverTx, 0, tCase.txsPerBlock)

for range tCase.txsPerBlock {
txs = append(txs, &abci.ResponseDeliverTx{
ResponseBase: abci.ResponseBase{
Error: nil,
},
})
}

return txs, nil
},
}
)

Expand Down Expand Up @@ -286,6 +300,19 @@ func TestBackup_ExecuteBackup_Watch(t *testing.T) {

return generateBlocks(t, from, to, tCase.txsPerBlock), nil
},
getTxResultsFn: func(_ uint64) ([]*abci.ResponseDeliverTx, error) {
txs := make([]*abci.ResponseDeliverTx, 0, tCase.txsPerBlock)

for range tCase.txsPerBlock {
txs = append(txs, &abci.ResponseDeliverTx{
ResponseBase: abci.ResponseBase{
Error: nil,
},
})
}

return txs, nil
},
}
)

Expand Down
4 changes: 4 additions & 0 deletions backup/client/client.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package client

import (
abci "github.com/gnolang/gno/tm2/pkg/bft/abci/types"
"github.com/gnolang/gno/tm2/pkg/std"
"golang.org/x/net/context"
)
Expand All @@ -14,6 +15,9 @@ type Client interface {
// timestamp in milliseconds - in the requested range only if they contain
// transactions
GetBlocks(ctx context.Context, from, to uint64) ([]*Block, error)

// GetTxResults returns the block transaction results (if any)
GetTxResults(block uint64) ([]*abci.ResponseDeliverTx, error)
}

type Block struct {
Expand Down
20 changes: 19 additions & 1 deletion backup/client/rpc/rpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (

_ "github.com/gnolang/gno/gno.land/pkg/sdk/vm"
"github.com/gnolang/gno/tm2/pkg/amino"
abci "github.com/gnolang/gno/tm2/pkg/bft/abci/types"
rpcClient "github.com/gnolang/gno/tm2/pkg/bft/rpc/client"
ctypes "github.com/gnolang/gno/tm2/pkg/bft/rpc/core/types"
"github.com/gnolang/gno/tm2/pkg/std"
Expand Down Expand Up @@ -121,7 +122,7 @@ func (c *Client) GetBlocks(ctx context.Context, from, to uint64) ([]*client.Bloc

// Add block including transactions, timestamp and block height to slice
blocks = append(blocks, &client.Block{
Timestamp: blockRes.Block.Time.UnixMilli(),
Timestamp: blockRes.Block.Time.Unix(),
Height: uint64(blockRes.Block.Height),
Txs: txs,
})
Expand All @@ -130,3 +131,20 @@ func (c *Client) GetBlocks(ctx context.Context, from, to uint64) ([]*client.Bloc

return blocks, nil
}

func (c *Client) GetTxResults(block uint64) ([]*abci.ResponseDeliverTx, error) {
block64 := int64(block)

results, err := c.client.BlockResults(&block64)
if err != nil {
return nil, fmt.Errorf("unable to fetch block results, %w", err)
}

txResults := make([]*abci.ResponseDeliverTx, 0)

for txIndex, tx := range results.Results.DeliverTxs {
txResults[txIndex] = &tx
}

return txResults, nil
}
10 changes: 6 additions & 4 deletions backup/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,17 @@ type Config struct {
ToBlock *uint64 // the right bound for the block range; latest if not specified
FromBlock uint64 // the left bound for the block range

Watch bool // flag indicating if incoming tx data should be backed up
Watch bool // flag indicating if incoming tx data should be backed up
SkipFailedTx bool // flag indicating if failed txs should be ignored
}

// DefaultConfig returns the default backup configuration
func DefaultConfig() Config {
return Config{
ToBlock: nil, // to latest block by default
FromBlock: 1, // from genesis + 1 by default
Watch: false, // no tracking by default
ToBlock: nil, // to latest block by default
FromBlock: 1, // from genesis + 1 by default
Watch: false, // no tracking by default
SkipFailedTx: false, // include all txs
}
}

Expand Down
11 changes: 11 additions & 0 deletions backup/mock_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,17 +3,20 @@ package backup
import (
"context"

abci "github.com/gnolang/gno/tm2/pkg/bft/abci/types"
"github.com/gnolang/tx-archive/backup/client"
)

type (
getLatestBlockNumberDelegate func() (uint64, error)
getBlocksDelegate func(context.Context, uint64, uint64) ([]*client.Block, error)
getTxResultsDelegate func(uint64) ([]*abci.ResponseDeliverTx, error)
)

type mockClient struct {
getLatestBlockNumberFn getLatestBlockNumberDelegate
getBlocksFn getBlocksDelegate
getTxResultsFn getTxResultsDelegate
}

func (m *mockClient) GetLatestBlockNumber() (uint64, error) {
Expand All @@ -31,3 +34,11 @@ func (m *mockClient) GetBlocks(ctx context.Context, from, to uint64) ([]*client.

return nil, nil
}

func (m *mockClient) GetTxResults(block uint64) ([]*abci.ResponseDeliverTx, error) {
if m.getTxResultsFn != nil {
return m.getTxResultsFn(block)
}

return nil, nil
}
7 changes: 7 additions & 0 deletions backup/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,3 +17,10 @@ func WithBatchSize(size uint) Option {
s.batchSize = size
}
}

// WithSkipFailedTxs specifies if failed txs should be backed up
func WithSkipFailedTxs(skip bool) Option {
return func(s *Service) {
s.skipFailedTxs = skip
}
}
20 changes: 15 additions & 5 deletions cmd/backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,11 +41,12 @@ type backupCfg struct {
fromBlock uint64
batchSize uint

ws bool
overwrite bool
legacy bool
watch bool
verbose bool
ws bool
overwrite bool
legacy bool
watch bool
verbose bool
skipFailedTxs bool
}

// newBackupCmd creates the backup command
Expand Down Expand Up @@ -135,6 +136,13 @@ func (c *backupCfg) registerFlags(fs *flag.FlagSet) {
false,
"flag indicating if the log verbosity should be set to debug level",
)

fs.BoolVar(
&c.skipFailedTxs,
"skip-failed-txs",
false,
"flag indicating if failed txs should be skipped",
)
}

// exec executes the backup command
Expand All @@ -159,6 +167,7 @@ func (c *backupCfg) exec(ctx context.Context, _ []string) error {
cfg := backup.DefaultConfig()
cfg.FromBlock = c.fromBlock
cfg.Watch = c.watch
cfg.SkipFailedTx = c.skipFailedTxs

if c.toBlock >= 0 {
to64 := uint64(c.toBlock)
Expand Down Expand Up @@ -240,6 +249,7 @@ func (c *backupCfg) exec(ctx context.Context, _ []string) error {
w,
backup.WithLogger(logger),
backup.WithBatchSize(c.batchSize),
backup.WithSkipFailedTxs(c.skipFailedTxs),
)

// Run the backup service
Expand Down

0 comments on commit 6cf77ca

Please sign in to comment.