Skip to content

Commit

Permalink
Merge pull request #320 from multiversx/events-filter-block-number-range
Browse files Browse the repository at this point in the history
add events filter query block number range
  • Loading branch information
dragos-rebegea authored Aug 13, 2024
2 parents 33fe9d5 + 29820c7 commit 4ded91d
Show file tree
Hide file tree
Showing 16 changed files with 232 additions and 167 deletions.
2 changes: 1 addition & 1 deletion bridges/ethMultiversX/bridgeExecutor.go
Original file line number Diff line number Diff line change
Expand Up @@ -474,7 +474,7 @@ func (executor *bridgeExecutor) addBatchSCMetadata(ctx context.Context, transfer
return nil, ErrNilBatch
}

events, err := executor.ethereumClient.GetBatchSCMetadata(ctx, transfers.ID)
events, err := executor.ethereumClient.GetBatchSCMetadata(ctx, transfers.ID, int64(transfers.BlockNumber))
if err != nil {
return nil, err
}
Expand Down
10 changes: 5 additions & 5 deletions bridges/ethMultiversX/bridgeExecutor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -361,7 +361,7 @@ func TestEthToMultiversXBridgeExecutor_GetAndStoreBatchFromEthereum(t *testing.T
assert.Equal(t, providedNonce, nonce)
return expectedBatch, false, nil
},
GetBatchSCMetadataCalled: func(ctx context.Context, nonce uint64) ([]*contract.ERC20SafeERC20SCDeposit, error) {
GetBatchSCMetadataCalled: func(ctx context.Context, nonce uint64, blockNumber int64) ([]*contract.ERC20SafeERC20SCDeposit, error) {
return make([]*contract.ERC20SafeERC20SCDeposit, 0), nil
},
}
Expand Down Expand Up @@ -389,7 +389,7 @@ func TestEthToMultiversXBridgeExecutor_GetAndStoreBatchFromEthereum(t *testing.T
assert.Equal(t, providedNonce, nonce)
return expectedBatch, true, nil
},
GetBatchSCMetadataCalled: func(ctx context.Context, nonce uint64) ([]*contract.ERC20SafeERC20SCDeposit, error) {
GetBatchSCMetadataCalled: func(ctx context.Context, nonce uint64, blockNumber int64) ([]*contract.ERC20SafeERC20SCDeposit, error) {
return make([]*contract.ERC20SafeERC20SCDeposit, 0), nil
},
}
Expand Down Expand Up @@ -421,7 +421,7 @@ func TestEthToMultiversXBridgeExecutor_GetAndStoreBatchFromEthereum(t *testing.T
assert.Equal(t, providedNonce, nonce)
return expectedBatch, true, nil
},
GetBatchSCMetadataCalled: func(ctx context.Context, nonce uint64) ([]*contract.ERC20SafeERC20SCDeposit, error) {
GetBatchSCMetadataCalled: func(ctx context.Context, nonce uint64, blockNumber int64) ([]*contract.ERC20SafeERC20SCDeposit, error) {
return []*contract.ERC20SafeERC20SCDeposit{{
DepositNonce: big.NewInt(0).SetUint64(depositNonce),
CallData: depositData,
Expand Down Expand Up @@ -461,7 +461,7 @@ func TestEthToMultiversXBridgeExecutor_GetAndStoreBatchFromEthereum(t *testing.T
assert.Equal(t, providedNonce, nonce)
return expectedBatch, true, nil
},
GetBatchSCMetadataCalled: func(ctx context.Context, nonce uint64) ([]*contract.ERC20SafeERC20SCDeposit, error) {
GetBatchSCMetadataCalled: func(ctx context.Context, nonce uint64, blockNumber int64) ([]*contract.ERC20SafeERC20SCDeposit, error) {
return []*contract.ERC20SafeERC20SCDeposit{{
DepositNonce: big.NewInt(0).SetUint64(depositNonce),
CallData: depositData,
Expand Down Expand Up @@ -494,7 +494,7 @@ func TestEthToMultiversXBridgeExecutor_GetAndStoreBatchFromEthereum(t *testing.T
assert.Equal(t, providedNonce, nonce)
return expectedBatch, true, nil
},
GetBatchSCMetadataCalled: func(ctx context.Context, nonce uint64) ([]*contract.ERC20SafeERC20SCDeposit, error) {
GetBatchSCMetadataCalled: func(ctx context.Context, nonce uint64, blockNumber int64) ([]*contract.ERC20SafeERC20SCDeposit, error) {
return []*contract.ERC20SafeERC20SCDeposit{{
DepositNonce: big.NewInt(0).SetUint64(depositNonce),
CallData: depositData,
Expand Down
2 changes: 1 addition & 1 deletion bridges/ethMultiversX/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ type EthereumClient interface {
GetTransactionsStatuses(ctx context.Context, batchId uint64) ([]byte, error)
GetQuorumSize(ctx context.Context) (*big.Int, error)
IsQuorumReached(ctx context.Context, msgHash common.Hash) (bool, error)
GetBatchSCMetadata(ctx context.Context, nonce uint64) ([]*contract.ERC20SafeERC20SCDeposit, error)
GetBatchSCMetadata(ctx context.Context, nonce uint64, blockNumber int64) ([]*contract.ERC20SafeERC20SCDeposit, error)
CheckClientAvailability(ctx context.Context) error
CheckRequiredBalance(ctx context.Context, erc20Address common.Address, value *big.Int) error
TotalBalances(ctx context.Context, token common.Address) (*big.Int, error)
Expand Down
113 changes: 63 additions & 50 deletions clients/ethereum/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,43 +23,47 @@ import (
)

const (
messagePrefix = "\u0019Ethereum Signed Message:\n32"
minQuorumValue = uint64(1)
minAllowedDelta = 1
messagePrefix = "\u0019Ethereum Signed Message:\n32"
minQuorumValue = uint64(1)
minClientAvailabilityAllowDelta = 1
)

// ArgsEthereumClient is the DTO used in the ethereum's client constructor
type ArgsEthereumClient struct {
ClientWrapper ClientWrapper
Erc20ContractsHandler Erc20ContractsHolder
Log chainCore.Logger
AddressConverter core.AddressConverter
Broadcaster Broadcaster
PrivateKey *ecdsa.PrivateKey
TokensMapper TokensMapper
SignatureHolder SignaturesHolder
SafeContractAddress common.Address
GasHandler GasHandler
TransferGasLimitBase uint64
TransferGasLimitForEach uint64
AllowDelta uint64
ClientWrapper ClientWrapper
Erc20ContractsHandler Erc20ContractsHolder
Log chainCore.Logger
AddressConverter core.AddressConverter
Broadcaster Broadcaster
PrivateKey *ecdsa.PrivateKey
TokensMapper TokensMapper
SignatureHolder SignaturesHolder
SafeContractAddress common.Address
GasHandler GasHandler
TransferGasLimitBase uint64
TransferGasLimitForEach uint64
ClientAvailabilityAllowDelta uint64
EventsBlockRangeFrom int64
EventsBlockRangeTo int64
}

type client struct {
clientWrapper ClientWrapper
erc20ContractsHandler Erc20ContractsHolder
log chainCore.Logger
addressConverter core.AddressConverter
broadcaster Broadcaster
privateKey *ecdsa.PrivateKey
publicKey *ecdsa.PublicKey
tokensMapper TokensMapper
signatureHolder SignaturesHolder
safeContractAddress common.Address
gasHandler GasHandler
transferGasLimitBase uint64
transferGasLimitForEach uint64
allowDelta uint64
clientWrapper ClientWrapper
erc20ContractsHandler Erc20ContractsHolder
log chainCore.Logger
addressConverter core.AddressConverter
broadcaster Broadcaster
privateKey *ecdsa.PrivateKey
publicKey *ecdsa.PublicKey
tokensMapper TokensMapper
signatureHolder SignaturesHolder
safeContractAddress common.Address
gasHandler GasHandler
transferGasLimitBase uint64
transferGasLimitForEach uint64
clientAvailabilityAllowDelta uint64
eventsBlockRangeFrom int64
eventsBlockRangeTo int64

lastBlockNumber uint64
retriesAvailabilityCheck uint64
Expand All @@ -80,20 +84,22 @@ func NewEthereumClient(args ArgsEthereumClient) (*client, error) {
}

c := &client{
clientWrapper: args.ClientWrapper,
erc20ContractsHandler: args.Erc20ContractsHandler,
log: args.Log,
addressConverter: args.AddressConverter,
broadcaster: args.Broadcaster,
privateKey: args.PrivateKey,
publicKey: publicKeyECDSA,
tokensMapper: args.TokensMapper,
signatureHolder: args.SignatureHolder,
safeContractAddress: args.SafeContractAddress,
gasHandler: args.GasHandler,
transferGasLimitBase: args.TransferGasLimitBase,
transferGasLimitForEach: args.TransferGasLimitForEach,
allowDelta: args.AllowDelta,
clientWrapper: args.ClientWrapper,
erc20ContractsHandler: args.Erc20ContractsHandler,
log: args.Log,
addressConverter: args.AddressConverter,
broadcaster: args.Broadcaster,
privateKey: args.PrivateKey,
publicKey: publicKeyECDSA,
tokensMapper: args.TokensMapper,
signatureHolder: args.SignatureHolder,
safeContractAddress: args.SafeContractAddress,
gasHandler: args.GasHandler,
transferGasLimitBase: args.TransferGasLimitBase,
transferGasLimitForEach: args.TransferGasLimitForEach,
clientAvailabilityAllowDelta: args.ClientAvailabilityAllowDelta,
eventsBlockRangeFrom: args.EventsBlockRangeFrom,
eventsBlockRangeTo: args.EventsBlockRangeTo,
}

c.log.Info("NewEthereumClient",
Expand Down Expand Up @@ -137,9 +143,13 @@ func checkArgs(args ArgsEthereumClient) error {
if args.TransferGasLimitForEach == 0 {
return errInvalidGasLimit
}
if args.AllowDelta < minAllowedDelta {
if args.ClientAvailabilityAllowDelta < minClientAvailabilityAllowDelta {
return fmt.Errorf("%w for args.AllowedDelta, got: %d, minimum: %d",
clients.ErrInvalidValue, args.AllowDelta, minAllowedDelta)
clients.ErrInvalidValue, args.ClientAvailabilityAllowDelta, minClientAvailabilityAllowDelta)
}
if args.EventsBlockRangeFrom > args.EventsBlockRangeTo {
return fmt.Errorf("%w, args.EventsBlockRangeFrom: %d, args.EventsBlockRangeTo: %d",
clients.ErrInvalidValue, args.EventsBlockRangeFrom, args.EventsBlockRangeTo)
}
return nil
}
Expand All @@ -162,8 +172,9 @@ func (c *client) GetBatch(ctx context.Context, nonce uint64) (*bridgeCommon.Tran
}

transferBatch := &bridgeCommon.TransferBatch{
ID: batch.Nonce.Uint64(),
Deposits: make([]*bridgeCommon.DepositTransfer, 0, batch.DepositsCount),
ID: batch.Nonce.Uint64(),
BlockNumber: batch.BlockNumber,
Deposits: make([]*bridgeCommon.DepositTransfer, 0, batch.DepositsCount),
}
cachedTokens := make(map[string][]byte)
for i := range deposits {
Expand Down Expand Up @@ -202,7 +213,7 @@ func (c *client) GetBatch(ctx context.Context, nonce uint64) (*bridgeCommon.Tran
}

// GetBatchSCMetadata returns the emitted logs in a batch that hold metadata for SC execution on MVX
func (c *client) GetBatchSCMetadata(ctx context.Context, nonce uint64) ([]*contract.ERC20SafeERC20SCDeposit, error) {
func (c *client) GetBatchSCMetadata(ctx context.Context, nonce uint64, blockNumber int64) ([]*contract.ERC20SafeERC20SCDeposit, error) {
scExecAbi, err := contract.ERC20SafeMetaData.GetAbi()
if err != nil {
return nil, err
Expand All @@ -214,6 +225,8 @@ func (c *client) GetBatchSCMetadata(ctx context.Context, nonce uint64) ([]*contr
{scExecAbi.Events["ERC20SCDeposit"].ID},
{common.BytesToHash(new(big.Int).SetUint64(nonce).Bytes())},
},
FromBlock: big.NewInt(blockNumber + c.eventsBlockRangeFrom),
ToBlock: big.NewInt(blockNumber + c.eventsBlockRangeTo),
}

logs, err := c.clientWrapper.FilterLogs(ctx, query)
Expand Down Expand Up @@ -401,7 +414,7 @@ func (c *client) CheckClientAvailability(ctx context.Context) error {
// if we reached this point we will need to increment the retries counter
defer c.incrementRetriesAvailabilityCheck()

if c.retriesAvailabilityCheck > c.allowDelta {
if c.retriesAvailabilityCheck > c.clientAvailabilityAllowDelta {
message := fmt.Sprintf("block %d fetched for %d times in a row", currentBlock, c.retriesAvailabilityCheck)
c.setStatusForAvailabilityCheck(ethmultiversx.Unavailable, message, currentBlock)

Expand Down
44 changes: 30 additions & 14 deletions clients/ethereum/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,12 +54,14 @@ func createMockEthereumClientArgs() ArgsEthereumClient {
return append([]byte("ERC20"), sourceBytes...), nil
},
},
SignatureHolder: &testsCommon.SignaturesHolderStub{},
SafeContractAddress: testsCommon.CreateRandomEthereumAddress(),
GasHandler: &testsCommon.GasHandlerStub{},
TransferGasLimitBase: 50,
TransferGasLimitForEach: 20,
AllowDelta: 5,
SignatureHolder: &testsCommon.SignaturesHolderStub{},
SafeContractAddress: testsCommon.CreateRandomEthereumAddress(),
GasHandler: &testsCommon.GasHandlerStub{},
TransferGasLimitBase: 50,
TransferGasLimitForEach: 20,
ClientAvailabilityAllowDelta: 5,
EventsBlockRangeFrom: -100,
EventsBlockRangeTo: 400,
}
}

Expand Down Expand Up @@ -185,18 +187,32 @@ func TestNewEthereumClient(t *testing.T) {
assert.Equal(t, errInvalidGasLimit, err)
assert.True(t, check.IfNil(c))
})
t.Run("invalid AllowDelta should error", func(t *testing.T) {
t.Run("invalid ClientAvailabilityAllowDelta should error", func(t *testing.T) {
t.Parallel()

args := createMockEthereumClientArgs()
args.AllowDelta = 0
args.ClientAvailabilityAllowDelta = 0

c, err := NewEthereumClient(args)

assert.True(t, check.IfNil(c))
assert.True(t, errors.Is(err, clients.ErrInvalidValue))
assert.True(t, strings.Contains(err.Error(), "for args.AllowedDelta"))
})
t.Run("invalid events block range from should error", func(t *testing.T) {
t.Parallel()

args := createMockEthereumClientArgs()
args.EventsBlockRangeFrom = 100
args.EventsBlockRangeTo = 50

c, err := NewEthereumClient(args)

assert.True(t, check.IfNil(c))
assert.True(t, errors.Is(err, clients.ErrInvalidValue))
assert.True(t, strings.Contains(err.Error(), "args.EventsBlockRangeFrom"))
assert.True(t, strings.Contains(err.Error(), "args.EventsBlockRangeTo"))
})
t.Run("should work", func(t *testing.T) {
args := createMockEthereumClientArgs()
c, err := NewEthereumClient(args)
Expand Down Expand Up @@ -1144,14 +1160,14 @@ func TestClient_CheckClientAvailability(t *testing.T) {
statusHandler.SetStringMetric(bridgeCore.MetricLastMultiversXClientError, "random")

// this will just increment the retry counter
for i := 0; i < int(args.AllowDelta); i++ {
for i := 0; i < int(args.ClientAvailabilityAllowDelta); i++ {
err := c.CheckClientAvailability(context.Background())
assert.Nil(t, err)
checkStatusHandler(t, statusHandler, ethmultiversx.Available, "")
}

for i := 0; i < 10; i++ {
message := fmt.Sprintf("block %d fetched for %d times in a row", currentNonce, args.AllowDelta+uint64(i+1))
message := fmt.Sprintf("block %d fetched for %d times in a row", currentNonce, args.ClientAvailabilityAllowDelta+uint64(i+1))
err := c.CheckClientAvailability(context.Background())
assert.Nil(t, err)
checkStatusHandler(t, statusHandler, ethmultiversx.Unavailable, message)
Expand All @@ -1164,14 +1180,14 @@ func TestClient_CheckClientAvailability(t *testing.T) {
incrementor = 0

// this will just increment the retry counter
for i := 0; i < int(args.AllowDelta); i++ {
for i := 0; i < int(args.ClientAvailabilityAllowDelta); i++ {
err := c.CheckClientAvailability(context.Background())
assert.Nil(t, err)
checkStatusHandler(t, statusHandler, ethmultiversx.Available, "")
}

for i := 0; i < 10; i++ {
message := fmt.Sprintf("block %d fetched for %d times in a row", currentNonce, args.AllowDelta+uint64(i+1))
message := fmt.Sprintf("block %d fetched for %d times in a row", currentNonce, args.ClientAvailabilityAllowDelta+uint64(i+1))
err := c.CheckClientAvailability(context.Background())
assert.Nil(t, err)
checkStatusHandler(t, statusHandler, ethmultiversx.Unavailable, message)
Expand Down Expand Up @@ -1211,7 +1227,7 @@ func TestClient_GetBatchSCMetadata(t *testing.T) {
},
}
c, _ := NewEthereumClient(args)
batch, err := c.GetBatchSCMetadata(context.Background(), 0)
batch, err := c.GetBatchSCMetadata(context.Background(), 0, 0)
assert.Nil(t, batch)
assert.Equal(t, expectedErr, err)
})
Expand Down Expand Up @@ -1239,7 +1255,7 @@ func TestClient_GetBatchSCMetadata(t *testing.T) {
},
}
c, _ := NewEthereumClient(args)
batch, err := c.GetBatchSCMetadata(context.Background(), expectedEvent.BatchId.Uint64())
batch, err := c.GetBatchSCMetadata(context.Background(), expectedEvent.BatchId.Uint64(), 500)

assert.Nil(t, err)
assert.Equal(t, 1, len(batch))
Expand Down
Loading

0 comments on commit 4ded91d

Please sign in to comment.