Skip to content

Commit

Permalink
Publish block before returning to proposer (#330)
Browse files Browse the repository at this point in the history
* publish block first

* Add concurrent release of the execution payload

* Fix capture

* Update logging

* Log the whole URI

* add 1s delay on returning getPaylaod

* fix naming

---------

Co-authored-by: Mateusz Morusiewicz <[email protected]>
  • Loading branch information
metachris and Ruteri authored Apr 3, 2023
1 parent 75d9525 commit 75823ce
Show file tree
Hide file tree
Showing 3 changed files with 50 additions and 24 deletions.
1 change: 0 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,6 @@ redis-cli DEL boost-relay/sepolia:validators-registration boost-relay/sepolia:va
#### Feature Flags

* `DISABLE_PAYLOAD_DATABASE_STORAGE` - builder API - disable storing execution payloads in the database (i.e. when using memcached as data availability redundancy)
* `DISABLE_BLOCK_PUBLISHING` - disable publishing blocks to the beacon node at the end of getPayload
* `DISABLE_LOWPRIO_BUILDERS` - reject block submissions by low-prio builders
* `FORCE_GET_HEADER_204` - force 204 as getHeader response
* `DISABLE_SSE_PAYLOAD_ATTRIBUTES` - instead of using SSE events, poll withdrawals and randao (requires custom Prysm fork)
Expand Down
44 changes: 37 additions & 7 deletions beaconclient/multi_beacon_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ var (
ErrBeaconNodeSyncing = errors.New("beacon node is syncing or unavailable")
ErrBeaconNodesUnavailable = errors.New("all beacon nodes responded with error")
ErrWithdrawalsBeforeCapella = errors.New("withdrawals are not supported before capella")
ErrBeaconBlock202 = errors.New("beacon block failed validation but was still broadcast (202)")
)

// IMultiBeaconClient is the interface for the MultiBeaconClient, which can manage several beacon client instances under the hood
Expand Down Expand Up @@ -210,6 +211,12 @@ func (c *MultiBeaconClient) beaconInstancesByLastResponse() []IBeaconInstance {
return instances
}

type publishResp struct {
index int
code int
err error
}

// PublishBlock publishes the signed beacon block via https://ethereum.github.io/beacon-APIs/#/ValidatorRequiredApi/publishBlock
func (c *MultiBeaconClient) PublishBlock(block *common.SignedBeaconBlock) (code int, err error) {
log := c.log.WithFields(logrus.Fields{
Expand All @@ -218,23 +225,46 @@ func (c *MultiBeaconClient) PublishBlock(block *common.SignedBeaconBlock) (code
})

clients := c.beaconInstancesByLastResponse()

// The chan will be cleaner up automatically once the function exists even if it was still being written to
resChans := make(chan publishResp, len(clients))

for i, client := range clients {
log := log.WithField("uri", client.GetURI())
log.Debug("publishing block")
go func(index int, client IBeaconInstance) {
code, err := client.PublishBlock(block)
resChans <- publishResp{
index: index,
code: code,
err: err,
}
}(i, client)
}

if code, err = client.PublishBlock(block); err != nil {
log.WithField("statusCode", code).WithError(err).Warn("failed to publish block")
var lastErrPublishResp publishResp
for i := 0; i < len(clients); i++ {
res := <-resChans
if res.err != nil {
log.WithField("beacon", clients[res.index].GetURI()).WithField("statusCode", res.code).WithError(res.err).Error("failed to publish block")
lastErrPublishResp = res
continue
} else if res.code == 202 {
// Should the block fail full validation, a separate success response code (202) is used to indicate that the block was successfully broadcast but failed integration.
// https://ethereum.github.io/beacon-APIs/?urls.primaryName=dev#/Beacon/publishBlock
log.WithField("beacon", clients[res.index].GetURI()).WithField("statusCode", res.code).WithError(res.err).Error("block failed validation but was still broadcast")
lastErrPublishResp = res
continue
}

c.bestBeaconIndex.Store(int64(i))
c.bestBeaconIndex.Store(int64(res.index))

log.WithField("statusCode", code).Info("published block")
return code, nil
log.WithField("beacon", clients[res.index].GetURI()).WithField("statusCode", res.code).Info("published block")
return res.code, nil
}

log.WithField("statusCode", code).WithError(err).Error("failed to publish block on any CL node")
return code, err
log.Error("failed to publish block on any CL node")
return lastErrPublishResp.code, lastErrPublishResp.err
}

// GetGenesis returns the genesis info - https://ethereum.github.io/beacon-APIs/#/Beacon/getGenesis
Expand Down
29 changes: 13 additions & 16 deletions services/api/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ var (
numActiveValidatorProcessors = cli.GetEnvInt("NUM_ACTIVE_VALIDATOR_PROCESSORS", 10)
numValidatorRegProcessors = cli.GetEnvInt("NUM_VALIDATOR_REG_PROCESSORS", 10)
timeoutGetPayloadRetryMs = cli.GetEnvInt("GETPAYLOAD_RETRY_TIMEOUT_MS", 100)
getPayloadResponseDelayMs = cli.GetEnvInt("GETPAYLOAD_DELAY_MS", 1000)

apiReadTimeoutMs = cli.GetEnvInt("API_TIMEOUT_READ_MS", 1500)
apiReadHeaderTimeoutMs = cli.GetEnvInt("API_TIMEOUT_READHEADER_MS", 600)
Expand Down Expand Up @@ -154,7 +155,6 @@ type RelayAPI struct {

// Feature flags
ffForceGetHeader204 bool
ffDisableBlockPublishing bool
ffDisableLowPrioBuilders bool
ffDisablePayloadDBStorage bool // disable storing the execution payloads in the database
ffDisableSSEPayloadAttributes bool // instead of SSE, fall back to previous polling withdrawals+prevRandao from our custom Prysm fork
Expand Down Expand Up @@ -238,11 +238,6 @@ func NewRelayAPI(opts RelayAPIOpts) (api *RelayAPI, err error) {
api.ffForceGetHeader204 = true
}

if os.Getenv("DISABLE_BLOCK_PUBLISHING") == "1" {
api.log.Warn("env: DISABLE_BLOCK_PUBLISHING - disabling publishing blocks on getPayload")
api.ffDisableBlockPublishing = true
}

if os.Getenv("DISABLE_LOWPRIO_BUILDERS") == "1" {
api.log.Warn("env: DISABLE_LOWPRIO_BUILDERS - allowing only high-level builders")
api.ffDisableLowPrioBuilders = true
Expand Down Expand Up @@ -981,6 +976,18 @@ func (api *RelayAPI) handleGetPayload(w http.ResponseWriter, req *http.Request)
}
}

// Publish the signed beacon block via beacon-node
signedBeaconBlock := SignedBlindedBeaconBlockToBeaconBlock(payload, getPayloadResp)
code, err := api.beaconClient.PublishBlock(signedBeaconBlock) // errors are logged inside
if err != nil {
log.WithError(err).WithField("code", code).Error("failed to publish block")
api.RespondError(w, http.StatusBadRequest, "failed to publish block")
return
}

// give the beacon network some time to propagate the block
time.Sleep(time.Duration(getPayloadResponseDelayMs) * time.Millisecond)

api.RespondOK(w, getPayloadResp)
log = log.WithFields(logrus.Fields{
"numTx": getPayloadResp.NumTx(),
Expand Down Expand Up @@ -1014,16 +1021,6 @@ func (api *RelayAPI) handleGetPayload(w http.ResponseWriter, req *http.Request)
log.WithError(err).Error("failed to increment builder-stats after getPayload")
}
}()

// Publish the signed beacon block via beacon-node
go func() {
if api.ffDisableBlockPublishing {
log.Info("publishing the block is disabled")
return
}
signedBeaconBlock := SignedBlindedBeaconBlockToBeaconBlock(payload, getPayloadResp)
_, _ = api.beaconClient.PublishBlock(signedBeaconBlock) // errors are logged inside
}()
}

// --------------------
Expand Down

0 comments on commit 75823ce

Please sign in to comment.