Skip to content

Commit

Permalink
fix: Integration Bali PP (#198)
Browse files Browse the repository at this point in the history
- Issues a long integration: 
- Wrong process of unknown error field `PError` on certificate header
- Fix use of aggsender log (to be able to filter by module)
- Fix a bad condition to show a warning of a reopen certificate
- Improved test for epoch notifier
- Fix certificate on local DB but AggLayer have none certificate (CDK-604)
- Add logs to `synv/evmdownloader.go` to have more information on error: `error calling FilterLogs to eth client: invalid params`
  • Loading branch information
joanestebanr authored Nov 26, 2024
1 parent e93e1b2 commit 1dfea4a
Show file tree
Hide file tree
Showing 16 changed files with 996 additions and 24 deletions.
16 changes: 15 additions & 1 deletion agglayer/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -563,6 +563,15 @@ func (c *ImportedBridgeExit) Hash() common.Hash {
)
}

type GenericPPError struct {
Key string
Value string
}

func (p *GenericPPError) String() string {
return fmt.Sprintf("Generic error: %s: %s", p.Key, p.Value)
}

// CertificateHeader is the structure returned by the interop_getCertificateHeader RPC call
type CertificateHeader struct {
NetworkID uint32 `json:"network_id"`
Expand Down Expand Up @@ -654,7 +663,12 @@ func (c *CertificateHeader) UnmarshalJSON(data []byte) error {

ppError = p
default:
return fmt.Errorf("invalid error type: %s", key)
valueStr, err := json.Marshal(value)
if err != nil {
ppError = &GenericPPError{Key: key, Value: "error marshalling value"}
} else {
ppError = &GenericPPError{Key: key, Value: string(valueStr)}
}
}
}

Expand Down
16 changes: 16 additions & 0 deletions agglayer/types_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,11 @@ const (
expectedSignedCertificateyMetadataJSON = `{"network_id":1,"height":1,"prev_local_exit_root":[0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0],"new_local_exit_root":[0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0],"bridge_exits":[{"leaf_type":"Transfer","token_info":null,"dest_network":0,"dest_address":"0x0000000000000000000000000000000000000000","amount":"1","metadata":[1,2,3]}],"imported_bridge_exits":[{"bridge_exit":{"leaf_type":"Transfer","token_info":null,"dest_network":0,"dest_address":"0x0000000000000000000000000000000000000000","amount":"1","metadata":[]},"claim_data":null,"global_index":{"mainnet_flag":false,"rollup_index":1,"leaf_index":1}}],"metadata":"0x0000000000000000000000000000000000000000000000000000000000000000","signature":{"r":"0x0000000000000000000000000000000000000000000000000000000000000000","s":"0x0000000000000000000000000000000000000000000000000000000000000000","odd_y_parity":false}}`
)

func TestMGenericPPError(t *testing.T) {
err := GenericPPError{"test", "value"}
require.Equal(t, "Generic error: test: value", err.String())
}

func TestMarshalJSON(t *testing.T) {
cert := SignedCertificate{
Certificate: &Certificate{
Expand Down Expand Up @@ -251,3 +256,14 @@ func TestGlobalIndex_UnmarshalFromMap(t *testing.T) {
})
}
}

func TestUnmarshalCertificateHeaderUnknownError(t *testing.T) {
str := "{\"network_id\":14,\"height\":0,\"epoch_number\":null,\"certificate_index\":null,\"certificate_id\":\"0x3af88c9ca106822bd141fdc680dcb888f4e9d4997fad1645ba3d5d747059eb32\",\"new_local_exit_root\":\"0x625e889ced3c31277c6653229096374d396a2fd3564a8894aaad2ff935d2fc8c\",\"metadata\":\"0x0000000000000000000000000000000000000000000000000000000000002f3d\",\"status\":{\"InError\":{\"error\":{\"ProofVerificationFailed\":{\"Plonk\":\"the verifying key does not match the inner plonk bn254 proof's committed verifying key\"}}}}}"
data := []byte(str)
var result *CertificateHeader
err := json.Unmarshal(data, &result)
require.NoError(t, err)
require.NotNil(t, result)
ppError := result.Error.String()
require.Equal(t, `Generic error: ProofVerificationFailed: {"Plonk":"the verifying key does not match the inner plonk bn254 proof's committed verifying key"}`, ppError)
}
16 changes: 10 additions & 6 deletions aggsender/aggsender.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,9 +93,9 @@ func (a *AggSender) checkInitialStatus(ctx context.Context) {

for {
if err := a.checkLastCertificateFromAgglayer(ctx); err != nil {
log.Errorf("error checking initial status: %w, retrying in %s", err, a.cfg.DelayBeetweenRetries.String())
a.log.Errorf("error checking initial status: %w, retrying in %s", err, a.cfg.DelayBeetweenRetries.String())
} else {
log.Info("Initial status checked successfully")
a.log.Info("Initial status checked successfully")
return
}
select {
Expand All @@ -116,7 +116,7 @@ func (a *AggSender) sendCertificates(ctx context.Context) {
thereArePendingCerts := a.checkPendingCertificatesStatus(ctx)
if !thereArePendingCerts {
if _, err := a.sendCertificate(ctx); err != nil {
log.Error(err)
a.log.Error(err)
}
} else {
log.Infof("Skipping epoch %s because there are pending certificates",
Expand Down Expand Up @@ -202,7 +202,6 @@ func (a *AggSender) sendCertificate(ctx context.Context) (*agglayer.SignedCertif

a.saveCertificateToFile(signedCertificate)
a.log.Infof("certificate ready to be send to AggLayer: %s", signedCertificate.String())

certificateHash, err := a.aggLayerClient.SendCertificate(signedCertificate)
if err != nil {
return nil, fmt.Errorf("error sending certificate: %w", err)
Expand Down Expand Up @@ -584,7 +583,7 @@ func (a *AggSender) updateCertificateStatus(ctx context.Context,
localCert.String())

// That is a strange situation
if agglayerCert.Status.IsOpen() == localCert.Status.IsClosed() {
if agglayerCert.Status.IsOpen() && localCert.Status.IsClosed() {
a.log.Warnf("certificate %s is reopen! from [%s] to [%s]",
localCert.ID(), localCert.Status, agglayerCert.Status)
}
Expand Down Expand Up @@ -638,6 +637,11 @@ func (a *AggSender) checkLastCertificateFromAgglayer(ctx context.Context) error
}
return nil
}
// CASE 2.1: certificate in storage but not in agglayer
// this is a non-sense, so thrown an error
if localLastCert != nil && aggLayerLastCert == nil {
return fmt.Errorf("recovery: certificate in storage but not in agglayer. Inconsistency")
}
// CASE 3: aggsender stopped between sending to agglayer and storing on DB
if aggLayerLastCert.Height == localLastCert.Height+1 {
a.log.Infof("recovery: AggLayer have next cert (height:%d), so is a recovery case: storing cert: %s",
Expand Down Expand Up @@ -673,7 +677,7 @@ func (a *AggSender) checkLastCertificateFromAgglayer(ctx context.Context) error
func (a *AggSender) updateLocalStorageWithAggLayerCert(ctx context.Context,
aggLayerCert *agglayer.CertificateHeader) (*types.CertificateInfo, error) {
certInfo := NewCertificateInfoFromAgglayerCertHeader(aggLayerCert)
log.Infof("setting initial certificate from AggLayer: %s", certInfo.String())
a.log.Infof("setting initial certificate from AggLayer: %s", certInfo.String())
return certInfo, a.storage.SaveLastSentCertificate(ctx, *certInfo)
}

Expand Down
13 changes: 13 additions & 0 deletions aggsender/aggsender_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1717,6 +1717,19 @@ func TestCheckLastCertificateFromAgglayer_Case2NoCertLocalCertRemoteErrorStorage
require.Error(t, err)
}

// CASE 2.1: certificate in storage but not in agglayer
// sub case of previous one that fails to update local storage
func TestCheckLastCertificateFromAgglayer_Case2_1NoCertRemoteButCertLocal(t *testing.T) {
testData := newAggsenderTestData(t, testDataFlagMockStorage)
testData.l2syncerMock.EXPECT().OriginNetwork().Return(networkIDTest).Once()
testData.agglayerClientMock.EXPECT().GetLatestKnownCertificateHeader(networkIDTest).
Return(nil, nil).Once()
testData.storageMock.EXPECT().GetLastSentCertificate().Return(&testData.testCerts[0], nil)
err := testData.sut.checkLastCertificateFromAgglayer(testData.ctx)

require.Error(t, err)
}

// CASE 3: AggSender and AggLayer not same certificateID. AggLayer has a new certificate
func TestCheckLastCertificateFromAgglayer_Case3Mismatch(t *testing.T) {
testData := newAggsenderTestData(t, testDataFlagMockStorage)
Expand Down
2 changes: 1 addition & 1 deletion aggsender/block_notifier_polling.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ func (b *BlockNotifierPolling) step(ctx context.Context,
newState := previousState.incommingNewBlock(currentBlock.Number.Uint64())
b.logger.Debugf("New block seen [finality:%s]: %d. blockRate:%s",
b.config.BlockFinalityType, currentBlock.Number.Uint64(), newState.previousBlockTime)

eventToEmit.BlockRate = *newState.previousBlockTime
return b.nextBlockRequestDelay(newState, nil), newState, eventToEmit
}

Expand Down
21 changes: 17 additions & 4 deletions aggsender/epoch_notifier_per_block.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,14 @@ type ConfigEpochNotifierPerBlock struct {
EpochNotificationPercentage uint
}

func (c *ConfigEpochNotifierPerBlock) String() string {
if c == nil {
return "nil"
}
return fmt.Sprintf("{startEpochBlock=%d, sizeEpoch=%d, threshold=%d%%}",
c.StartingEpochBlock, c.NumBlockPerEpoch, c.EpochNotificationPercentage)
}

func NewConfigEpochNotifierPerBlock(aggLayer agglayer.AggLayerClientGetEpochConfiguration,
epochNotificationPercentage uint) (*ConfigEpochNotifierPerBlock, error) {
if aggLayer == nil {
Expand Down Expand Up @@ -89,9 +97,7 @@ func NewEpochNotifierPerBlock(blockNotifier types.BlockNotifier,
}

func (e *EpochNotifierPerBlock) String() string {
return fmt.Sprintf("EpochNotifierPerBlock: startingEpochBlock=%d, numBlockPerEpoch=%d,"+
" EpochNotificationPercentage=%d",
e.Config.StartingEpochBlock, e.Config.NumBlockPerEpoch, e.Config.EpochNotificationPercentage)
return fmt.Sprintf("EpochNotifierPerBlock: config: %s", e.Config.String())
}

// StartAsync starts the notifier in a goroutine
Expand Down Expand Up @@ -147,6 +153,14 @@ func (e *EpochNotifierPerBlock) step(status internalStatus,
status.lastBlockSeen = currentBlock

needNotify, closingEpoch := e.isNotificationRequired(currentBlock, status.waitingForEpoch)
percentEpoch := e.percentEpoch(currentBlock)
logFunc := e.logger.Debugf
if needNotify {
logFunc = e.logger.Infof
}
logFunc("New block seen [finality:%s]: %d. blockRate:%s Epoch:%d Percent:%f%% notify:%v config:%s",
newBlock.BlockFinalityType, newBlock.BlockNumber, newBlock.BlockRate, closingEpoch,
percentEpoch*maxPercent, needNotify, e.Config.String())
if needNotify {
// Notify the epoch has started
info := e.infoEpoch(currentBlock, closingEpoch)
Expand Down Expand Up @@ -179,7 +193,6 @@ func (e *EpochNotifierPerBlock) isNotificationRequired(currentBlock, lastEpochNo
thresholdPercent = maxTresholdPercent
}
if percentEpoch < thresholdPercent {
e.logger.Debugf("Block %d is at %f%% of the epoch no notify", currentBlock, percentEpoch*maxPercent)
return false, e.epochNumber(currentBlock)
}
nextEpoch := e.epochNumber(currentBlock) + 1
Expand Down
11 changes: 11 additions & 0 deletions aggsender/epoch_notifier_per_block_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,17 @@ import (
"github.com/stretchr/testify/require"
)

func TestConfigEpochNotifierPerBlockString(t *testing.T) {
cfg := ConfigEpochNotifierPerBlock{
StartingEpochBlock: 123,
NumBlockPerEpoch: 456,
EpochNotificationPercentage: 789,
}
require.Equal(t, "{startEpochBlock=123, sizeEpoch=456, threshold=789%}", cfg.String())
var cfg2 *ConfigEpochNotifierPerBlock
require.Equal(t, "nil", cfg2.String())
}

func TestStartingBlockEpoch(t *testing.T) {
testData := newNotifierPerBlockTestData(t, &ConfigEpochNotifierPerBlock{
StartingEpochBlock: 9,
Expand Down
7 changes: 6 additions & 1 deletion aggsender/types/block_notifier.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,15 @@
package types

import "github.com/0xPolygon/cdk/etherman"
import (
"time"

"github.com/0xPolygon/cdk/etherman"
)

type EventNewBlock struct {
BlockNumber uint64
BlockFinalityType etherman.BlockNumberFinality
BlockRate time.Duration
}

// BlockNotifier is the interface that wraps the basic methods to notify a new block.
Expand Down
11 changes: 10 additions & 1 deletion sync/evmdownloader.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package sync
import (
"context"
"errors"
"fmt"
"math/big"
"time"

Expand Down Expand Up @@ -229,6 +230,11 @@ func (d *EVMDownloaderImplementation) GetEventsByBlockRange(ctx context.Context,
}
}

func filterQueryToString(query ethereum.FilterQuery) string {
return fmt.Sprintf("FromBlock: %s, ToBlock: %s, Addresses: %s, Topics: %s",
query.FromBlock.String(), query.ToBlock.String(), query.Addresses, query.Topics)
}

func (d *EVMDownloaderImplementation) GetLogs(ctx context.Context, fromBlock, toBlock uint64) []types.Log {
query := ethereum.FilterQuery{
FromBlock: new(big.Int).SetUint64(fromBlock),
Expand All @@ -249,7 +255,10 @@ func (d *EVMDownloaderImplementation) GetLogs(ctx context.Context, fromBlock, to
}

attempts++
d.log.Error("error calling FilterLogs to eth client: ", err)
d.log.Errorf("error calling FilterLogs to eth client: filter: %s err:%w ",
filterQueryToString(query),
err,
)
d.rh.Handle("getLogs", attempts)
continue
}
Expand Down
38 changes: 38 additions & 0 deletions sync/evmdownloader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -381,6 +381,44 @@ func TestGetBlockHeader(t *testing.T) {
assert.False(t, isCanceled)
}

func TestFilterQueryToString(t *testing.T) {
addr1 := common.HexToAddress("0xf000")
addr2 := common.HexToAddress("0xabcd")
query := ethereum.FilterQuery{
FromBlock: new(big.Int).SetUint64(1000),
Addresses: []common.Address{addr1, addr2},
ToBlock: new(big.Int).SetUint64(1100),
}

assert.Equal(t, "FromBlock: 1000, ToBlock: 1100, Addresses: [0x000000000000000000000000000000000000f000 0x000000000000000000000000000000000000ABcD], Topics: []", filterQueryToString(query))

query = ethereum.FilterQuery{
FromBlock: new(big.Int).SetUint64(1000),
Addresses: []common.Address{addr1, addr2},
ToBlock: new(big.Int).SetUint64(1100),
Topics: [][]common.Hash{{common.HexToHash("0x1234"), common.HexToHash("0x5678")}},
}
assert.Equal(t, "FromBlock: 1000, ToBlock: 1100, Addresses: [0x000000000000000000000000000000000000f000 0x000000000000000000000000000000000000ABcD], Topics: [[0x0000000000000000000000000000000000000000000000000000000000001234 0x0000000000000000000000000000000000000000000000000000000000005678]]", filterQueryToString(query))
}

func TestGetLogs(t *testing.T) {
mockEthClient := NewL2Mock(t)
sut := EVMDownloaderImplementation{
ethClient: mockEthClient,
adressessToQuery: []common.Address{contractAddr},
log: log.WithFields("test", "EVMDownloaderImplementation"),
rh: &RetryHandler{
RetryAfterErrorPeriod: time.Millisecond,
MaxRetryAttemptsAfterError: 5,
},
}
ctx := context.TODO()
mockEthClient.EXPECT().FilterLogs(ctx, mock.Anything).Return(nil, errors.New("foo")).Once()
mockEthClient.EXPECT().FilterLogs(ctx, mock.Anything).Return(nil, nil).Once()
logs := sut.GetLogs(ctx, 0, 1)
require.Equal(t, []types.Log{}, logs)
}

func buildAppender() LogAppenderMap {
appender := make(LogAppenderMap)
appender[eventSignature] = func(b *EVMBlock, l types.Log) error {
Expand Down
Loading

0 comments on commit 1dfea4a

Please sign in to comment.