diff --git a/beacon/client/client.go b/beacon/client/client.go index 75f410f2..a43469e3 100644 --- a/beacon/client/client.go +++ b/beacon/client/client.go @@ -20,6 +20,7 @@ import ( var ( ErrHTTPErrorResponse = errors.New("got an HTTP error response") ErrNodesUnavailable = errors.New("beacon nodes are unavailable") + ErrBlockPublish202 = errors.New("the block failed validation, but was successfully broadcast anyway. It was not integrated into the beacon node's database") ) type beaconClient struct { @@ -174,22 +175,29 @@ func (b *beaconClient) GetForkSchedule() (spec *GetForkScheduleResponse, err err return resp, err } -func (b *beaconClient) PublishBlock(block structs.SignedBeaconBlock) error { - bb, err := json.Marshal(block) - if err != nil { +func (b *beaconClient) PublishBlock(ctx context.Context, block structs.SignedBeaconBlock) error { + buff := bytes.NewBuffer(nil) + enc := json.NewEncoder(buff) + if err := enc.Encode(block); err != nil { return fmt.Errorf("fail to marshal block: %w", err) } t := prometheus.NewTimer(b.m.Timing.WithLabelValues("/eth/v1/beacon/blocks", "POST")) defer t.ObserveDuration() - resp, err := http.Post(b.beaconEndpoint.String()+"/eth/v1/beacon/blocks", "application/json", bytes.NewBuffer(bb)) + req, err := http.NewRequestWithContext(ctx, http.MethodPost, b.beaconEndpoint.String()+"/eth/v1/beacon/blocks", buff) + if err != nil { + return fmt.Errorf("fail to publish block: %w", err) + } + + req.Header.Set("Content-Type", "application/json") + resp, err := http.DefaultClient.Do(req) if err != nil { return fmt.Errorf("fail to publish block: %w", err) } if resp.StatusCode == 202 { // https://ethereum.github.io/beacon-APIs/#/Beacon/publishBlock - return fmt.Errorf("the block failed validation, but was successfully broadcast anyway. It was not integrated into the beacon node's database") + return ErrBlockPublish202 } else if resp.StatusCode >= 300 { ec := &struct { Code int `json:"code"` diff --git a/beacon/client/mocks/mocks.go b/beacon/client/mocks/mocks.go index e167e593..f202b47f 100644 --- a/beacon/client/mocks/mocks.go +++ b/beacon/client/mocks/mocks.go @@ -126,17 +126,17 @@ func (mr *MockBeaconNodeMockRecorder) KnownValidators(arg0 interface{}) *gomock. } // PublishBlock mocks base method. -func (m *MockBeaconNode) PublishBlock(arg0 structs.SignedBeaconBlock) error { +func (m *MockBeaconNode) PublishBlock(arg0 context.Context, arg1 structs.SignedBeaconBlock) error { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "PublishBlock", arg0) + ret := m.ctrl.Call(m, "PublishBlock", arg0, arg1) ret0, _ := ret[0].(error) return ret0 } // PublishBlock indicates an expected call of PublishBlock. -func (mr *MockBeaconNodeMockRecorder) PublishBlock(arg0 interface{}) *gomock.Call { +func (mr *MockBeaconNodeMockRecorder) PublishBlock(arg0, arg1 interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "PublishBlock", reflect.TypeOf((*MockBeaconNode)(nil).PublishBlock), arg0) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "PublishBlock", reflect.TypeOf((*MockBeaconNode)(nil).PublishBlock), arg0, arg1) } // Randao mocks base method. diff --git a/beacon/client/multi.go b/beacon/client/multi.go index c30a7867..d0e6228f 100644 --- a/beacon/client/multi.go +++ b/beacon/client/multi.go @@ -15,6 +15,7 @@ import ( var ( ErrBeaconNodeSyncing = errors.New("beacon node is syncing") ErrWithdrawalsUnsupported = errors.New("withdrawals are not supported") + ErrFailedToPublish = errors.New("failed to publish") ) type BeaconNode interface { @@ -24,7 +25,7 @@ type BeaconNode interface { KnownValidators(structs.Slot) (AllValidatorsResponse, error) Genesis() (structs.GenesisInfo, error) GetForkSchedule() (*GetForkScheduleResponse, error) - PublishBlock(block structs.SignedBeaconBlock) error + PublishBlock(context.Context, structs.SignedBeaconBlock) error Randao(structs.Slot) (string, error) Endpoint() string GetWithdrawals(structs.Slot) (*GetWithdrawalsResponse, error) @@ -231,17 +232,44 @@ func (b *MultiBeaconClient) clientsByLastResponse() []BeaconNode { return instances } -func (b *MultiBeaconClient) PublishBlock(block structs.SignedBeaconBlock) (err error) { +func (b *MultiBeaconClient) PublishBlock(ctx context.Context, block structs.SignedBeaconBlock) (err error) { + resp := make(chan error, 20) + var i int for _, client := range b.clientsByLastResponse() { - if err = client.PublishBlock(block); err != nil { - b.Log.WithError(err). - WithField("endpoint", client.Endpoint()). - Warn("failed to publish block to beacon") - continue - } + i++ + c := client + go publishAsync(ctx, c, b.Log, block, resp) + } - return nil + var ( + defError error + r int + ) + for { + select { + case <-ctx.Done(): + return ctx.Err() + case e := <-resp: + r++ + switch e { + case nil: + return nil + default: + defError = e + if r == i { + return defError + } + } + } } +} - return err +func publishAsync(ctx context.Context, client BeaconNode, l log.Logger, block structs.SignedBeaconBlock, resp chan<- error) { + err := client.PublishBlock(ctx, block) + if err != nil { + l.WithError(err). + WithField("endpoint", client.Endpoint()). + Warn("failed to publish block to beacon") + } + resp <- err } diff --git a/cmd/dreamboat/main.go b/cmd/dreamboat/main.go index 52e3aa58..6235f0d7 100644 --- a/cmd/dreamboat/main.go +++ b/cmd/dreamboat/main.go @@ -239,6 +239,12 @@ var flags = []cli.Flag{ Value: "", EnvVars: []string{"BLOCK_VALIDATION_ENDPOINT_RPC"}, }, + &cli.DurationFlag{ + Name: "block-publication-delay", + Usage: "Delay between lock publication and returning request to validator", + Value: time.Second, + EnvVars: []string{"BLOCK_PUBLICATION_DELAY"}, + }, } const ( @@ -434,6 +440,7 @@ func run() cli.ActionFunc { r := relay.NewRelay(logger, relay.RelayConfig{ BuilderSigningDomain: domainBuilder, + BlockPublishDelay: c.Duration("block-publication-delay"), ProposerSigningDomain: map[structs.ForkVersion]types.Domain{ structs.ForkBellatrix: bellatrixBeaconProposer, structs.ForkCapella: capellaBeaconProposer}, diff --git a/relay/mocks/mocks.go b/relay/mocks/mocks.go index d7871652..ec356a5d 100644 --- a/relay/mocks/mocks.go +++ b/relay/mocks/mocks.go @@ -552,15 +552,15 @@ func (m *MockBeacon) EXPECT() *MockBeaconMockRecorder { } // PublishBlock mocks base method. -func (m *MockBeacon) PublishBlock(arg0 structs.SignedBeaconBlock) error { +func (m *MockBeacon) PublishBlock(arg0 context.Context, arg1 structs.SignedBeaconBlock) error { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "PublishBlock", arg0) + ret := m.ctrl.Call(m, "PublishBlock", arg0, arg1) ret0, _ := ret[0].(error) return ret0 } // PublishBlock indicates an expected call of PublishBlock. -func (mr *MockBeaconMockRecorder) PublishBlock(arg0 interface{}) *gomock.Call { +func (mr *MockBeaconMockRecorder) PublishBlock(arg0, arg1 interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "PublishBlock", reflect.TypeOf((*MockBeacon)(nil).PublishBlock), arg0) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "PublishBlock", reflect.TypeOf((*MockBeacon)(nil).PublishBlock), arg0, arg1) } diff --git a/relay/relay.go b/relay/relay.go index b2abd55d..f54559c7 100644 --- a/relay/relay.go +++ b/relay/relay.go @@ -40,6 +40,8 @@ var ( ErrInvalidTimestamp = errors.New("invalid timestamp") ErrInvalidSlot = errors.New("invalid slot") ErrEmptyBlock = errors.New("block is empty") + ErrWrongPayload = errors.New("wrong publish payload") + ErrFailedToPublish = errors.New("failed to publish block") ) type BlockValidationClient interface { @@ -94,7 +96,7 @@ type Auctioneer interface { } type Beacon interface { - PublishBlock(block structs.SignedBeaconBlock) error + PublishBlock(ctx context.Context, block structs.SignedBeaconBlock) error } type RelayConfig struct { @@ -102,6 +104,7 @@ type RelayConfig struct { ProposerSigningDomain map[structs.ForkVersion]types.Domain PubKey types.PublicKey SecretKey *bls.SecretKey + BlockPublishDelay time.Duration AllowedListedBuilders map[[48]byte]struct{} @@ -333,10 +336,11 @@ func (rs *Relay) GetPayload(ctx context.Context, m *structs.MetricGroup, payload } logger := rs.l.With(log.F{ - "method": "GetPayload", - "slot": payloadRequest.Slot(), - "blockHash": payloadRequest.BlockHash(), - "pubkey": pk, + "method": "GetPayload", + "slot": payloadRequest.Slot(), + "block_number": payloadRequest.BlockNumber(), + "blockHash": payloadRequest.BlockHash(), + "pubkey": pk, }) logger.WithField("event", "payload_requested").Info("payload requested") @@ -379,38 +383,36 @@ func (rs *Relay) GetPayload(ctx context.Context, m *structs.MetricGroup, payload "processingTimeMs": time.Since(tStart).Milliseconds(), }) - rs.runnignAsyncs.Add(1) - go func(wg *TimeoutWaitGroup, l log.Logger, rs *Relay, slot structs.Slot, payloadRequest structs.SignedBlindedBeaconBlock) { - defer wg.Done() - if rs.config.PublishBlock { - beaconBlock, err := payloadRequest.ToBeaconBlock(payload.ExecutionPayload()) - if err != nil { - l.WithField("event", "wrong_publish_payload").WithError(err).Warn("fail to create block for publication") - } else { - if err = rs.beacon.PublishBlock(beaconBlock); err != nil { - l.With(log.F{ - "slot": slot, - "block_number": payloadRequest.BlockNumber(), - }).WithField("event", "publish_error").WithError(err).Warn("fail to publish block to beacon node") - } else { - l.WithField("event", "published").Info("published block to beacon node") - } - } + if rs.config.PublishBlock { + beaconBlock, err := payloadRequest.ToBeaconBlock(payload.ExecutionPayload()) + if err != nil { + logger.WithField("event", "wrong_publish_payload").WithError(err).Error("fail to create block for publication") + return nil, ErrWrongPayload } + if err = rs.beacon.PublishBlock(ctx, beaconBlock); err != nil { + logger.WithField("event", "publish_error").WithError(err).Error("fail to publish block to beacon node") + return nil, ErrFailedToPublish + } + logger.WithField("event", "published").Info("published block to beacon node") + // Delay the return of response block publishing + time.Sleep(rs.config.BlockPublishDelay) + } - trace, err := payload.ToDeliveredTrace(payloadRequest.Slot()) + rs.runnignAsyncs.Add(1) + go func(wg *TimeoutWaitGroup, l log.Logger, rs *Relay, slot uint64) { + defer wg.Done() + trace, err := payload.ToDeliveredTrace(slot) if err != nil { - l.WithField("event", "wrong_evidence_payload").WithError(err).Warn("failed to generate delivered payload") + l.WithField("event", "wrong_evidence_payload").WithError(err).Error("failed to generate delivered payload") return } - if err := rs.das.PutDelivered(context.Background(), slot, trace, rs.config.TTL); err != nil { + if err := rs.das.PutDelivered(context.Background(), structs.Slot(slot), trace, rs.config.TTL); err != nil { l.WithField("event", "evidence_failure").WithError(err).Warn("failed to set payload after delivery") } - }(rs.runnignAsyncs, logger, rs, structs.Slot(payloadRequest.Slot()), payloadRequest) + }(rs.runnignAsyncs, logger, rs, payloadRequest.Slot()) exp := payload.ExecutionPayload() - switch forkv { case structs.ForkBellatrix: bep := exp.(*bellatrix.ExecutionPayload) @@ -450,8 +452,8 @@ func (rs *Relay) GetPayload(ctx context.Context, m *structs.MetricGroup, payload } -type TimeoutWaitGroup struct { - running int64 +type TimeoutWaitGroup struct { + running int64 done chan struct{} } @@ -464,12 +466,12 @@ func (wg *TimeoutWaitGroup) Add(i int64) { case <-wg.done: return default: - } + } atomic.AddInt64(&wg.running, i) } func (wg *TimeoutWaitGroup) Done() { - if atomic.AddInt64(&wg.running, -1) == 0 { + if atomic.AddInt64(&wg.running, -1) == 0 { close(wg.done) } }