Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Synchronous block propagation #124

Merged
merged 3 commits into from
Apr 3, 2023
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 13 additions & 5 deletions beacon/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
var (
ErrHTTPErrorResponse = errors.New("got an HTTP error response")
ErrNodesUnavailable = errors.New("beacon nodes are unavailable")
ErrBlockPublish202 = errors.New("the block failed validation, but was successfully broadcast anyway. It was not integrated into the beacon node's database")
)

type beaconClient struct {
Expand Down Expand Up @@ -174,22 +175,29 @@ func (b *beaconClient) GetForkSchedule() (spec *GetForkScheduleResponse, err err
return resp, err
}

func (b *beaconClient) PublishBlock(block structs.SignedBeaconBlock) error {
bb, err := json.Marshal(block)
if err != nil {
func (b *beaconClient) PublishBlock(ctx context.Context, block structs.SignedBeaconBlock) error {
buff := bytes.NewBuffer(nil)
enc := json.NewEncoder(buff)
if err := enc.Encode(block); err != nil {
return fmt.Errorf("fail to marshal block: %w", err)
}

t := prometheus.NewTimer(b.m.Timing.WithLabelValues("/eth/v1/beacon/blocks", "POST"))
defer t.ObserveDuration()

resp, err := http.Post(b.beaconEndpoint.String()+"/eth/v1/beacon/blocks", "application/json", bytes.NewBuffer(bb))
req, err := http.NewRequestWithContext(ctx, http.MethodPost, b.beaconEndpoint.String()+"/eth/v1/beacon/blocks", buff)
if err != nil {
return fmt.Errorf("fail to publish block: %w", err)
}

req.Header.Set("Content-Type", "application/json")
resp, err := http.DefaultClient.Do(req)
if err != nil {
return fmt.Errorf("fail to publish block: %w", err)
}

if resp.StatusCode == 202 { // https://ethereum.github.io/beacon-APIs/#/Beacon/publishBlock
return fmt.Errorf("the block failed validation, but was successfully broadcast anyway. It was not integrated into the beacon node's database")
return ErrBlockPublish202
} else if resp.StatusCode >= 300 {
ec := &struct {
Code int `json:"code"`
Expand Down
8 changes: 4 additions & 4 deletions beacon/client/mocks/mocks.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

50 changes: 40 additions & 10 deletions beacon/client/multi.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
var (
ErrBeaconNodeSyncing = errors.New("beacon node is syncing")
ErrWithdrawalsUnsupported = errors.New("withdrawals are not supported")
ErrFailedToPublish = errors.New("failed to publish")
)

type BeaconNode interface {
Expand All @@ -24,7 +25,7 @@ type BeaconNode interface {
KnownValidators(structs.Slot) (AllValidatorsResponse, error)
Genesis() (structs.GenesisInfo, error)
GetForkSchedule() (*GetForkScheduleResponse, error)
PublishBlock(block structs.SignedBeaconBlock) error
PublishBlock(context.Context, structs.SignedBeaconBlock) error
Randao(structs.Slot) (string, error)
Endpoint() string
GetWithdrawals(structs.Slot) (*GetWithdrawalsResponse, error)
Expand Down Expand Up @@ -231,17 +232,46 @@ func (b *MultiBeaconClient) clientsByLastResponse() []BeaconNode {
return instances
}

func (b *MultiBeaconClient) PublishBlock(block structs.SignedBeaconBlock) (err error) {
func (b *MultiBeaconClient) PublishBlock(ctx context.Context, block structs.SignedBeaconBlock) (err error) {
resp := make(chan error, 20)
var i int
for _, client := range b.clientsByLastResponse() {
if err = client.PublishBlock(block); err != nil {
b.Log.WithError(err).
WithField("endpoint", client.Endpoint()).
Warn("failed to publish block to beacon")
continue
}
i++
c := client
go publishAsync(ctx, c, b.Log, block, resp)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do we do c:=client? Can't we simply use client?

}

return nil
var (
defError error
r int
)
for {
select {
case <-ctx.Done():
return ctx.Err()
case e := <-resp:
r++
switch e {
case nil:
return nil
case ErrBlockPublish202:
return ErrFailedToPublish
default:
defError = e
if r == i {
return defError
}
}
}
}
}

return err
func publishAsync(ctx context.Context, client BeaconNode, l log.Logger, block structs.SignedBeaconBlock, resp chan<- error) {
err := client.PublishBlock(ctx, block)
if err != nil {
l.WithError(err).
WithField("endpoint", client.Endpoint()).
Warn("failed to publish block to beacon")
}
resp <- err
}
7 changes: 7 additions & 0 deletions cmd/dreamboat/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -239,6 +239,12 @@ var flags = []cli.Flag{
Value: "",
EnvVars: []string{"BLOCK_VALIDATION_ENDPOINT_RPC"},
},
&cli.DurationFlag{
Name: "block-publication-delay",
Usage: "Delay between lock publication and returning request to validator",
Value: time.Second,
EnvVars: []string{"BLOCK_PUBLICATION_DELAY"},
},
}

const (
Expand Down Expand Up @@ -434,6 +440,7 @@ func run() cli.ActionFunc {

r := relay.NewRelay(logger, relay.RelayConfig{
BuilderSigningDomain: domainBuilder,
BlockPublishDelay: c.Duration("block-publication-delay"),
ProposerSigningDomain: map[structs.ForkVersion]types.Domain{
structs.ForkBellatrix: bellatrixBeaconProposer,
structs.ForkCapella: capellaBeaconProposer},
Expand Down
8 changes: 4 additions & 4 deletions relay/mocks/mocks.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

64 changes: 33 additions & 31 deletions relay/relay.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@ var (
ErrInvalidTimestamp = errors.New("invalid timestamp")
ErrInvalidSlot = errors.New("invalid slot")
ErrEmptyBlock = errors.New("block is empty")
ErrWrongPayload = errors.New("wrong publish payload")
ErrFailedToPublish = errors.New("failed to publish block")
)

type BlockValidationClient interface {
Expand Down Expand Up @@ -94,14 +96,15 @@ type Auctioneer interface {
}

type Beacon interface {
PublishBlock(block structs.SignedBeaconBlock) error
PublishBlock(ctx context.Context, block structs.SignedBeaconBlock) error
}

type RelayConfig struct {
BuilderSigningDomain types.Domain
ProposerSigningDomain map[structs.ForkVersion]types.Domain
PubKey types.PublicKey
SecretKey *bls.SecretKey
BlockPublishDelay time.Duration

AllowedListedBuilders map[[48]byte]struct{}

Expand Down Expand Up @@ -333,10 +336,11 @@ func (rs *Relay) GetPayload(ctx context.Context, m *structs.MetricGroup, payload
}

logger := rs.l.With(log.F{
"method": "GetPayload",
"slot": payloadRequest.Slot(),
"blockHash": payloadRequest.BlockHash(),
"pubkey": pk,
"method": "GetPayload",
"slot": payloadRequest.Slot(),
"block_number": payloadRequest.BlockNumber(),
"blockHash": payloadRequest.BlockHash(),
"pubkey": pk,
})
logger.WithField("event", "payload_requested").Info("payload requested")

Expand Down Expand Up @@ -379,38 +383,36 @@ func (rs *Relay) GetPayload(ctx context.Context, m *structs.MetricGroup, payload
"processingTimeMs": time.Since(tStart).Milliseconds(),
})

rs.runnignAsyncs.Add(1)
go func(wg *TimeoutWaitGroup, l log.Logger, rs *Relay, slot structs.Slot, payloadRequest structs.SignedBlindedBeaconBlock) {
defer wg.Done()
if rs.config.PublishBlock {
beaconBlock, err := payloadRequest.ToBeaconBlock(payload.ExecutionPayload())
if err != nil {
l.WithField("event", "wrong_publish_payload").WithError(err).Warn("fail to create block for publication")
} else {
if err = rs.beacon.PublishBlock(beaconBlock); err != nil {
l.With(log.F{
"slot": slot,
"block_number": payloadRequest.BlockNumber(),
}).WithField("event", "publish_error").WithError(err).Warn("fail to publish block to beacon node")
} else {
l.WithField("event", "published").Info("published block to beacon node")
}
}
if rs.config.PublishBlock {
beaconBlock, err := payloadRequest.ToBeaconBlock(payload.ExecutionPayload())
if err != nil {
logger.WithField("event", "wrong_publish_payload").WithError(err).Error("fail to create block for publication")
return nil, ErrWrongPayload
}
if err = rs.beacon.PublishBlock(ctx, beaconBlock); err != nil {
logger.WithField("event", "publish_error").WithError(err).Error("fail to publish block to beacon node")
return nil, ErrFailedToPublish
}
logger.WithField("event", "published").Info("published block to beacon node")
// Delay the return of response block publishing
time.Sleep(rs.config.BlockPublishDelay)
}

trace, err := payload.ToDeliveredTrace(payloadRequest.Slot())
rs.runnignAsyncs.Add(1)
go func(wg *TimeoutWaitGroup, l log.Logger, rs *Relay, slot uint64) {
defer wg.Done()
trace, err := payload.ToDeliveredTrace(slot)
if err != nil {
l.WithField("event", "wrong_evidence_payload").WithError(err).Warn("failed to generate delivered payload")
l.WithField("event", "wrong_evidence_payload").WithError(err).Error("failed to generate delivered payload")
return
}

if err := rs.das.PutDelivered(context.Background(), slot, trace, rs.config.TTL); err != nil {
if err := rs.das.PutDelivered(context.Background(), structs.Slot(slot), trace, rs.config.TTL); err != nil {
l.WithField("event", "evidence_failure").WithError(err).Warn("failed to set payload after delivery")
}
}(rs.runnignAsyncs, logger, rs, structs.Slot(payloadRequest.Slot()), payloadRequest)
}(rs.runnignAsyncs, logger, rs, payloadRequest.Slot())

exp := payload.ExecutionPayload()

switch forkv {
case structs.ForkBellatrix:
bep := exp.(*bellatrix.ExecutionPayload)
Expand Down Expand Up @@ -450,8 +452,8 @@ func (rs *Relay) GetPayload(ctx context.Context, m *structs.MetricGroup, payload

}

type TimeoutWaitGroup struct {
running int64
type TimeoutWaitGroup struct {
running int64
done chan struct{}
}

Expand All @@ -464,12 +466,12 @@ func (wg *TimeoutWaitGroup) Add(i int64) {
case <-wg.done:
return
default:
}
}
atomic.AddInt64(&wg.running, i)
}

func (wg *TimeoutWaitGroup) Done() {
if atomic.AddInt64(&wg.running, -1) == 0 {
if atomic.AddInt64(&wg.running, -1) == 0 {
close(wg.done)
}
}
Expand Down