Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(engine api): [3/3] Engine API retryWithTimeout #2490

Open
wants to merge 15 commits into
base: main
Choose a base branch
from
2 changes: 1 addition & 1 deletion beacon/blockchain/execution_engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ func (s *Service) sendPostBlockFCU(
},
s.chainSpec.ActiveForkVersionForSlot(beaconBlk.GetSlot()),
)
if _, _, err = s.executionEngine.NotifyForkchoiceUpdate(ctx, req); err != nil {
if _, err = s.executionEngine.NotifyForkchoiceUpdate(ctx, req); err != nil {
return fmt.Errorf("failed forkchoice update, head %s: %w",
lph.GetBlockHash().String(),
err,
Expand Down
2 changes: 1 addition & 1 deletion beacon/blockchain/payload.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ func (s *Service) forceSyncUponFinalize(
s.chainSpec.ActiveForkVersionForSlot(beaconBlock.GetSlot()),
)

switch _, _, err := s.executionEngine.NotifyForkchoiceUpdate(ctx, req); {
switch _, err := s.executionEngine.NotifyForkchoiceUpdate(ctx, req); {
case err == nil:
return nil

Expand Down
2 changes: 1 addition & 1 deletion beacon/blockchain/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ type ExecutionEngine interface {
NotifyForkchoiceUpdate(
ctx context.Context,
req *ctypes.ForkchoiceUpdateRequest,
) (*engineprimitives.PayloadID, *common.ExecutionHash, error)
) (*engineprimitives.PayloadID, error)
}

// ExecutionPayload is the interface for the execution payload.
Expand Down
274 changes: 176 additions & 98 deletions execution/engine/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ package engine

import (
"context"
"time"

ctypes "github.com/berachain/beacon-kit/consensus-types/types"
engineprimitives "github.com/berachain/beacon-kit/engine-primitives/engine-primitives"
Expand All @@ -31,6 +32,15 @@ import (
"github.com/berachain/beacon-kit/log"
"github.com/berachain/beacon-kit/primitives/common"
jsonrpc "github.com/berachain/beacon-kit/primitives/net/json-rpc"
"github.com/cenkalti/backoff/v5"
)

const (
engineAPITimeout = time.Minute * 5
engineAPIInitialInterval = 50 * time.Millisecond
engineAPIRandomizationFactor = 0.5
engineAPIMultiplier = 1.5
engineAPIMaxInterval = 60 * time.Second
)

// Engine is Beacon-Kit's implementation of the `ExecutionEngine`
Expand Down Expand Up @@ -73,130 +83,198 @@ func (ee *Engine) GetPayload(
func (ee *Engine) NotifyForkchoiceUpdate(
ctx context.Context,
req *ctypes.ForkchoiceUpdateRequest,
) (*engineprimitives.PayloadID, *common.ExecutionHash, error) {
// Log the forkchoice update attempt.
) (*engineprimitives.PayloadID, error) {
hasPayloadAttributes := !req.PayloadAttributes.IsNil()
ee.metrics.markNotifyForkchoiceUpdateCalled(hasPayloadAttributes)

// Notify the execution engine of the forkchoice update.
payloadID, latestValidHash, err := ee.ec.ForkchoiceUpdated(
ctx,
req.State,
req.PayloadAttributes,
req.ForkVersion,
)
// Configure backoff.
engineAPIBackoff := backoff.ExponentialBackOff{
InitialInterval: engineAPIInitialInterval,
RandomizationFactor: engineAPIRandomizationFactor,
Multiplier: engineAPIMultiplier,
MaxInterval: engineAPIMaxInterval,
}
pID, err := backoff.Retry(ctx, func() (*engineprimitives.PayloadID, error) {
// Log the forkchoice update attempt.
ee.metrics.markNotifyForkchoiceUpdateCalled(hasPayloadAttributes)

switch {
case err == nil:
ee.metrics.markForkchoiceUpdateValid(
req.State, hasPayloadAttributes, payloadID,
// Notify the execution engine of the forkchoice update.
payloadID, _, innerErr := ee.ec.ForkchoiceUpdated(
ctx,
req.State,
req.PayloadAttributes,
req.ForkVersion,
)

// If we reached here, and we have a nil payload ID, we should log a
// warning.
if payloadID == nil && hasPayloadAttributes {
ee.logger.Warn(
"Received nil payload ID on VALID engine response",
"head_eth1_hash", req.State.HeadBlockHash,
"safe_eth1_hash", req.State.SafeBlockHash,
"finalized_eth1_hash", req.State.FinalizedBlockHash,
// NotifyForkchoiceUpdate gets called under two circumstances:
// 1. Payload Building (During PrepareProposal or
// optimistically in ProcessProposal)
// 2. FinalizeBlock
// We'll discriminate error handling based on these.
switch {
case innerErr == nil:
ee.metrics.markForkchoiceUpdateValid(
req.State, hasPayloadAttributes, payloadID,
)
return nil, nil, ErrNilPayloadOnValidResponse
}

return payloadID, latestValidHash, nil

case errors.Is(err, engineerrors.ErrSyncingPayloadStatus):
// We bubble up syncing as an error, to be able to stop
// bootstrapping from progressing in CL while EL is syncing.
ee.metrics.markForkchoiceUpdateSyncing(req.State, err)
return nil, nil, err

case errors.Is(err, engineerrors.ErrInvalidPayloadStatus):
// If we get invalid payload status, we will need to find a valid
// ancestor block and force a recovery.
ee.metrics.markForkchoiceUpdateInvalid(req.State, err)
return nil, nil, ErrBadBlockProduced

case jsonrpc.IsPreDefinedError(err):
// JSON-RPC errors are predefined and should be handled as such.
ee.metrics.markForkchoiceUpdateJSONRPCError(err)
return nil, nil, errors.Join(err, engineerrors.ErrPreDefinedJSONRPC)

default:
// All other errors are handled as undefined errors.
ee.metrics.markForkchoiceUpdateUndefinedError(err)
return nil, nil, err
// If we reached here, we have a VALID status and a nil payload ID,
// we should log a warning and error.
if payloadID == nil && hasPayloadAttributes {
ee.logger.Warn(
"Received nil payload ID on VALID engine response",
"head_eth1_hash", req.State.HeadBlockHash,
"safe_eth1_hash", req.State.SafeBlockHash,
"finalized_eth1_hash", req.State.FinalizedBlockHash,
)
// Do not retry, return the error.
return nil, ErrNilPayloadOnValidResponse
}

// We've received a valid response, no more retries.
return payloadID, nil

case errors.IsAny(innerErr, engineerrors.ErrSyncingPayloadStatus):
ee.metrics.markForkchoiceUpdateSyncing(req.State, innerErr)
// In all circumstances, keep retrying until the EVM is synced.
return nil, innerErr

case errors.Is(innerErr, engineerrors.ErrInvalidPayloadStatus):
ee.metrics.markForkchoiceUpdateInvalid(req.State, innerErr)
// During payload building, then there is an invalid
// payload and should error.
// During FinalizeBlock, something is broken because
// this should never happen.
return nil, backoff.Permanent(innerErr)

case jsonrpc.IsPreDefinedError(innerErr):
ee.metrics.markForkchoiceUpdateJSONRPCError(innerErr)
// In all circumstances, always retry on RPC Error.
return nil, innerErr

default:
ee.metrics.markForkchoiceUpdateUndefinedError(innerErr)
// Retry on unknown errors, we'll log the error and retry.
// TODO: discriminate more of these errors:
// RPC Timeout Errors
// Connection Refused Errors
// Erroneous Parsing Errors
return nil, innerErr
}
},
backoff.WithBackOff(&engineAPIBackoff),
backoff.WithMaxElapsedTime(engineAPITimeout),
)
if err != nil {
return nil, err
}

return pID, nil
}

// NotifyNewPayload notifies the execution client of the new payload.
//
//nolint:funlen // Lots of comments.
func (ee *Engine) NotifyNewPayload(
ctx context.Context,
req *ctypes.NewPayloadRequest,
) error {
// Log the new payload attempt.
ee.metrics.markNewPayloadCalled(
req.ExecutionPayload.GetBlockHash(),
req.ExecutionPayload.GetParentHash(),
)
// Configure backoff.
engineAPIBackoff := backoff.ExponentialBackOff{
InitialInterval: engineAPIInitialInterval,
RandomizationFactor: engineAPIRandomizationFactor,
Multiplier: engineAPIMultiplier,
MaxInterval: engineAPIMaxInterval,
}

// Otherwise we will send the payload to the execution client.
lastValidHash, err := ee.ec.NewPayload(
ctx,
req.ExecutionPayload,
req.VersionedHashes,
req.ParentBeaconBlockRoot,
)

// We abstract away some of the complexity and categorize status codes
// to make it easier to reason about.
switch {
case err == nil:
ee.metrics.markNewPayloadValid(
_, err := backoff.Retry(ctx, func() (*common.ExecutionHash, error) {
// Log the new payload attempt.
ee.metrics.markNewPayloadCalled(
req.ExecutionPayload.GetBlockHash(),
req.ExecutionPayload.GetParentHash(),
)

case errors.Is(err, engineerrors.ErrSyncingPayloadStatus):
// If we get accepted or syncing, we are going to optimistically
// say that the block is valid, this is utilized during syncing
// to allow the beacon-chain to continue processing blocks, while
// its execution client is fetching things over it's p2p layer.
ee.metrics.markNewPayloadSyncingPayloadStatus(
req.ExecutionPayload.GetBlockHash(),
req.ExecutionPayload.GetParentHash(),
lastValidHash, innerErr := ee.ec.NewPayload(
ctx,
req.ExecutionPayload,
req.VersionedHashes,
req.ParentBeaconBlockRoot,
)

case errors.IsAny(err, engineerrors.ErrAcceptedPayloadStatus):
ee.metrics.markNewPayloadAcceptedPayloadStatus(
req.ExecutionPayload.GetBlockHash(),
req.ExecutionPayload.GetParentHash(),
)
// NotifyNewPayload gets called under three circumstances:
// 1. ProcessProposal state transition
// 2. FinalizeBlock state transition
// We'll discriminate error handling based on these.
switch {
case innerErr == nil:
ee.metrics.markNewPayloadValid(
req.ExecutionPayload.GetBlockHash(),
req.ExecutionPayload.GetParentHash(),
)
// We've received a valid response, no more retries.
return lastValidHash, nil
case errors.Is(innerErr, engineerrors.ErrSyncingPayloadStatus):
ee.metrics.markNewPayloadSyncingPayloadStatus(
req.ExecutionPayload.GetBlockHash(),
req.ExecutionPayload.GetParentHash(),
)
// During ProcessProposal, we must be able to verify the
// block. Since we do not send a NotifyForkchoiceUpdate
// during ProcessProposal, we must retry here until EL is
// synced.
// TODO: Add way to determine if this is during FinalizeBlock.
// During FinalizeBlock, we do not need to verify the block.
// We do not need to retry here, as the following call to
// NotifyForkchoiceUpdate will inform the EL of the new head
// and then wait for it to sync.
return nil, innerErr

case errors.Is(err, engineerrors.ErrInvalidPayloadStatus):
ee.metrics.markNewPayloadInvalidPayloadStatus(
req.ExecutionPayload.GetBlockHash(),
)
case errors.IsAny(innerErr, engineerrors.ErrAcceptedPayloadStatus):
ee.metrics.markNewPayloadAcceptedPayloadStatus(
req.ExecutionPayload.GetBlockHash(),
req.ExecutionPayload.GetParentHash(),
)
// We may treat this status the same as SYNCING.
return nil, innerErr

case jsonrpc.IsPreDefinedError(err):
// Protect against possible nil value.
if lastValidHash == nil {
lastValidHash = &common.ExecutionHash{}
}
case errors.Is(innerErr, engineerrors.ErrInvalidPayloadStatus):
ee.metrics.markNewPayloadInvalidPayloadStatus(
req.ExecutionPayload.GetBlockHash(),
)
// During payload building, then there is an invalid
// payload and should error.
// During FinalizeBlock, something is broken because
// this should never happen.
return nil, backoff.Permanent(innerErr)

ee.metrics.markNewPayloadJSONRPCError(
req.ExecutionPayload.GetBlockHash(),
*lastValidHash,
err,
)
err = errors.Join(err, engineerrors.ErrPreDefinedJSONRPC)
case jsonrpc.IsPreDefinedError(innerErr):
// Protect against possible nil value.
if lastValidHash == nil {
lastValidHash = &common.ExecutionHash{}
}

default:
ee.metrics.markNewPayloadUndefinedError(
req.ExecutionPayload.GetBlockHash(),
err,
)
}
ee.metrics.markNewPayloadJSONRPCError(
req.ExecutionPayload.GetBlockHash(),
*lastValidHash,
innerErr,
)

// In all circumstances, always retry on RPC Error.
return nil, innerErr
default:
ee.metrics.markNewPayloadUndefinedError(
req.ExecutionPayload.GetBlockHash(),
innerErr,
)
// Retry on unknown errors, we'll log the error and retry.
// TODO: discriminate more of these errors:
// RPC Timeout Errors
// Connection Refused Errors
// Erroneous Parsing Errors
return nil, innerErr
}
},
backoff.WithBackOff(&engineAPIBackoff),
backoff.WithMaxElapsedTime(engineAPITimeout),
)
return err
}
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ require (
cosmossdk.io/log v1.5.0
cosmossdk.io/math v1.4.0
cosmossdk.io/store v1.10.0-rc.1.0.20241218084712-ca559989da43
github.com/cenkalti/backoff/v5 v5.0.1
github.com/cometbft/cometbft v1.0.1-0.20241220100824-07c737de00ff
github.com/cometbft/cometbft/api v1.0.1-0.20241220100824-07c737de00ff
github.com/cosmos/cosmos-db v1.1.0
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,8 @@ github.com/bytedance/sonic/loader v0.2.1 h1:1GgorWTqf12TA8mma4DDSbaQigE2wOgQo7iC
github.com/bytedance/sonic/loader v0.2.1/go.mod h1:ncP89zfokxS5LZrJxl5z0UJcsk4M4yY2JpfqGeCtNLU=
github.com/cenkalti/backoff v2.2.1+incompatible h1:tNowT99t7UNflLxfYYSlKYsBpXdEet03Pg2g16Swow4=
github.com/cenkalti/backoff v2.2.1+incompatible/go.mod h1:90ReRw6GdpyfrHakVjL/QHaoyV4aDUVVkXQJJJ3NXXM=
github.com/cenkalti/backoff/v5 v5.0.1 h1:kGZdCHH1+eW+Yd0wftimjMuhg9zidDvNF5aGdnkkb+U=
github.com/cenkalti/backoff/v5 v5.0.1/go.mod h1:rkhZdG3JZukswDf7f0cwqPNk4K0sa+F97BxZthm/crw=
github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU=
github.com/cespare/cp v1.1.1 h1:nCb6ZLdB7NRaqsm91JtQTAme2SKJzXVsdPIPkyJr1MU=
github.com/cespare/cp v1.1.1/go.mod h1:SOGHArjBr4JWaSDEVpWpo/hNg6RoKrls6Oh40hiwW+s=
Expand Down
4 changes: 2 additions & 2 deletions payload/builder/payload.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ func (pb *PayloadBuilder) RequestPayloadAsync(
attrs,
pb.chainSpec.ActiveForkVersionForSlot(slot),
)
payloadID, _, err := pb.ee.NotifyForkchoiceUpdate(ctx, req)
payloadID, err := pb.ee.NotifyForkchoiceUpdate(ctx, req)
if err != nil {
return nil, fmt.Errorf("RequestPayloadAsync failed sending forkchoice update: %w", err)
}
Expand Down Expand Up @@ -230,7 +230,7 @@ func (pb *PayloadBuilder) SendForceHeadFCU(
},
pb.chainSpec.ActiveForkVersionForSlot(slot),
)
if _, _, err = pb.ee.NotifyForkchoiceUpdate(ctx, req); err != nil {
if _, err = pb.ee.NotifyForkchoiceUpdate(ctx, req); err != nil {
return fmt.Errorf("SendForceHeadFCU failed sending forkchoice update: %w", err)
}
return nil
Expand Down
4 changes: 2 additions & 2 deletions payload/builder/payload_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,8 +167,8 @@ func (ee *stubExecutionEngine) GetPayload(

func (ee *stubExecutionEngine) NotifyForkchoiceUpdate(
context.Context, *ctypes.ForkchoiceUpdateRequest,
) (*engineprimitives.PayloadID, *common.ExecutionHash, error) {
return nil, nil, errStubNotImplemented
) (*engineprimitives.PayloadID, error) {
return nil, errStubNotImplemented
}

type stubAttributesFactory struct{}
Expand Down
Loading
Loading