Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

beacon/light/sync: print error log if checkpoint retrieval fails #29532

Merged
merged 4 commits into from
Apr 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 8 additions & 2 deletions beacon/blsync/block_sync_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,8 @@ import (
)

var (
testServer1 = "testServer1"
testServer2 = "testServer2"
testServer1 = testServer("testServer1")
testServer2 = testServer("testServer2")

testBlock1 = types.NewBeaconBlock(&deneb.BeaconBlock{
Slot: 123,
Expand All @@ -51,6 +51,12 @@ var (
})
)

type testServer string

func (t testServer) Name() string {
return string(t)
}

func TestBlockSync(t *testing.T) {
ht := &testHeadTracker{}
blockSync := newBeaconBlockSync(ht)
Expand Down
9 changes: 8 additions & 1 deletion beacon/light/api/api_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,8 +73,10 @@ func (s *ApiServer) SendRequest(id request.ID, req request.Request) {
r.Updates, r.Committees, err = s.api.GetBestUpdatesAndCommittees(data.FirstPeriod, data.Count)
resp = r
case sync.ReqHeader:
var r sync.RespHeader
log.Debug("Beacon API: requesting header", "reqid", id, "hash", common.Hash(data))
resp, err = s.api.GetHeader(common.Hash(data))
r.Header, r.Canonical, r.Finalized, err = s.api.GetHeader(common.Hash(data))
resp = r
case sync.ReqCheckpointData:
log.Debug("Beacon API: requesting checkpoint data", "reqid", id, "hash", common.Hash(data))
resp, err = s.api.GetCheckpointData(common.Hash(data))
Expand All @@ -101,3 +103,8 @@ func (s *ApiServer) Unsubscribe() {
s.unsubscribe = nil
}
}

// Name implements request.Server
func (s *ApiServer) Name() string {
return s.api.url
}
17 changes: 10 additions & 7 deletions beacon/light/api/light_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -291,7 +291,9 @@ func decodeFinalityUpdate(enc []byte) (types.FinalityUpdate, error) {

// GetHeader fetches and validates the beacon header with the given blockRoot.
// If blockRoot is null hash then the latest head header is fetched.
func (api *BeaconLightApi) GetHeader(blockRoot common.Hash) (types.Header, error) {
// The values of the canonical and finalized flags are also returned. Note that
// these flags are not validated.
func (api *BeaconLightApi) GetHeader(blockRoot common.Hash) (types.Header, bool, bool, error) {
var blockId string
if blockRoot == (common.Hash{}) {
blockId = "head"
Expand All @@ -300,11 +302,12 @@ func (api *BeaconLightApi) GetHeader(blockRoot common.Hash) (types.Header, error
}
resp, err := api.httpGetf("/eth/v1/beacon/headers/%s", blockId)
if err != nil {
return types.Header{}, err
return types.Header{}, false, false, err
}

var data struct {
Data struct {
Finalized bool `json:"finalized"`
Data struct {
Root common.Hash `json:"root"`
Canonical bool `json:"canonical"`
Header struct {
Expand All @@ -314,16 +317,16 @@ func (api *BeaconLightApi) GetHeader(blockRoot common.Hash) (types.Header, error
} `json:"data"`
}
if err := json.Unmarshal(resp, &data); err != nil {
return types.Header{}, err
return types.Header{}, false, false, err
}
header := data.Data.Header.Message
if blockRoot == (common.Hash{}) {
blockRoot = data.Data.Root
}
if header.Hash() != blockRoot {
return types.Header{}, errors.New("retrieved beacon header root does not match")
return types.Header{}, false, false, errors.New("retrieved beacon header root does not match")
}
return header, nil
return header, data.Data.Canonical, data.Finalized, nil
}

// GetCheckpointData fetches and validates bootstrap data belonging to the given checkpoint.
Expand Down Expand Up @@ -446,7 +449,7 @@ func (api *BeaconLightApi) StartHeadListener(listener HeadEventListener) func()
defer wg.Done()

// Request initial data.
if head, err := api.GetHeader(common.Hash{}); err == nil {
if head, _, _, err := api.GetHeader(common.Hash{}); err == nil {
listener.OnNewHead(head.Slot, head.Hash())
}
if signedHead, err := api.GetOptimisticHeadUpdate(); err == nil {
Expand Down
4 changes: 3 additions & 1 deletion beacon/light/request/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,9 @@ type (
// the modules that do not interact with them directly.
// In order to make module testing easier, Server interface is used in
// events and modules.
Server any
Server interface {
Name() string
}
Request any
Response any
ID uint64
Expand Down
4 changes: 4 additions & 0 deletions beacon/light/request/scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,10 @@ type testServer struct {
canRequest int
}

func (s *testServer) Name() string {
return ""
}

func (s *testServer) subscribe(eventCb func(Event)) {
s.eventCb = eventCb
}
Expand Down
7 changes: 7 additions & 0 deletions beacon/light/request/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ const (
// EvResponse or EvFail. Additionally, it may also send application-defined
// events that the Modules can interpret.
type requestServer interface {
Name() string
Subscribe(eventCallback func(Event))
SendRequest(ID, Request)
Unsubscribe()
Expand All @@ -69,6 +70,7 @@ type requestServer interface {
// limit the number of parallel in-flight requests and temporarily disable
// new requests based on timeouts and response failures.
type server interface {
Server
subscribe(eventCallback func(Event))
canRequestNow() bool
sendRequest(Request) ID
Expand Down Expand Up @@ -138,6 +140,11 @@ type serverWithTimeout struct {
lastID ID
}

// Name implements request.Server
func (s *serverWithTimeout) Name() string {
return s.parent.Name()
}

// init initializes serverWithTimeout
func (s *serverWithTimeout) init(clock mclock.Clock) {
s.clock = clock
Expand Down
1 change: 1 addition & 0 deletions beacon/light/request/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,7 @@ type testRequestServer struct {
eventCb func(Event)
}

func (rs *testRequestServer) Name() string { return "" }
func (rs *testRequestServer) Subscribe(eventCb func(Event)) { rs.eventCb = eventCb }
func (rs *testRequestServer) SendRequest(ID, Request) {}
func (rs *testRequestServer) Unsubscribe() {}
14 changes: 10 additions & 4 deletions beacon/light/sync/head_sync_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,10 @@ import (
)

var (
testServer1 = "testServer1"
testServer2 = "testServer2"
testServer3 = "testServer3"
testServer4 = "testServer4"
testServer1 = testServer("testServer1")
testServer2 = testServer("testServer2")
testServer3 = testServer("testServer3")
testServer4 = testServer("testServer4")

testHead0 = types.HeadInfo{}
testHead1 = types.HeadInfo{Slot: 123, BlockRoot: common.Hash{1}}
Expand All @@ -42,6 +42,12 @@ var (
testSHead4 = types.SignedHeader{SignatureSlot: 0x6444, Header: types.Header{Slot: 0x6443, StateRoot: common.Hash{4}}}
)

type testServer string

func (t testServer) Name() string {
return string(t)
}

func TestValidatedHead(t *testing.T) {
chain := &TestCommitteeChain{}
ht := &TestHeadTracker{}
Expand Down
4 changes: 2 additions & 2 deletions beacon/light/sync/test_helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ func (ts *TestScheduler) Run(testIndex int, exp ...any) {
if count == 0 {
continue
}
ts.t.Errorf("Missing %d Server.Fail(s) from server %s in test case #%d", count, server.(string), testIndex)
ts.t.Errorf("Missing %d Server.Fail(s) from server %s in test case #%d", count, server.Name(), testIndex)
}

if !reflect.DeepEqual(ts.sent[testIndex], expReqs) {
Expand Down Expand Up @@ -104,7 +104,7 @@ func (ts *TestScheduler) Send(server request.Server, req request.Request) reques

func (ts *TestScheduler) Fail(server request.Server, desc string) {
if ts.expFail[server] == 0 {
ts.t.Errorf("Unexpected Fail from server %s in test case #%d: %s", server.(string), ts.testIndex, desc)
ts.t.Errorf("Unexpected Fail from server %s in test case #%d: %s", server.Name(), ts.testIndex, desc)
return
}
ts.expFail[server]--
Expand Down
6 changes: 5 additions & 1 deletion beacon/light/sync/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,11 @@ type (
Updates []*types.LightClientUpdate
Committees []*types.SerializedSyncCommittee
}
ReqHeader common.Hash
ReqHeader common.Hash
RespHeader struct {
Header types.Header
Canonical, Finalized bool
}
ReqCheckpointData common.Hash
ReqBeaconBlock common.Hash
)
139 changes: 117 additions & 22 deletions beacon/light/sync/update_sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (

"github.com/ethereum/go-ethereum/beacon/light"
"github.com/ethereum/go-ethereum/beacon/light/request"
"github.com/ethereum/go-ethereum/beacon/params"
"github.com/ethereum/go-ethereum/beacon/types"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/log"
Expand All @@ -42,47 +43,141 @@ type CheckpointInit struct {
checkpointHash common.Hash
locked request.ServerAndID
initialized bool
// per-server state is used to track the state of requesting checkpoint header
// info. Part of this info (canonical and finalized state) is not validated
// and therefore it is requested from each server separately after it has
// reported a missing checkpoint (which is also not validated info).
serverState map[request.Server]serverState
// the following fields are used to determine whether the checkpoint is on
// epoch boundary. This information is validated and therefore stored globally.
parentHash common.Hash
hasEpochInfo, epochBoundary bool
cpSlot, parentSlot uint64
}

const (
ssDefault = iota // no action yet or checkpoint requested
ssNeedHeader // checkpoint req failed, need cp header
ssHeaderRequested // cp header requested
ssNeedParent // cp header slot %32 != 0, need parent to check epoch boundary
ssParentRequested // cp parent header requested
ssPrintStatus // has all necessary info, print log message if init still not successful
ssDone // log message printed, no more action required
)

type serverState struct {
state int
hasHeader, canonical, finalized bool // stored per server because not validated
}

// NewCheckpointInit creates a new CheckpointInit.
func NewCheckpointInit(chain committeeChain, checkpointHash common.Hash) *CheckpointInit {
return &CheckpointInit{
chain: chain,
checkpointHash: checkpointHash,
serverState: make(map[request.Server]serverState),
}
}

// Process implements request.Module.
func (s *CheckpointInit) Process(requester request.Requester, events []request.Event) {
if s.initialized {
return
}
for _, event := range events {
if !event.IsRequestEvent() {
continue
}
sid, req, resp := event.RequestInfo()
if s.locked == sid {
s.locked = request.ServerAndID{}
}
if resp != nil {
if checkpoint := resp.(*types.BootstrapData); checkpoint.Header.Hash() == common.Hash(req.(ReqCheckpointData)) {
s.chain.CheckpointInit(*checkpoint)
s.initialized = true
return
switch event.Type {
case request.EvResponse, request.EvFail, request.EvTimeout:
sid, req, resp := event.RequestInfo()
if s.locked == sid {
s.locked = request.ServerAndID{}
}

requester.Fail(event.Server, "invalid checkpoint data")
if event.Type == request.EvTimeout {
continue
}
switch s.serverState[sid.Server].state {
case ssDefault:
if resp != nil {
if checkpoint := resp.(*types.BootstrapData); checkpoint.Header.Hash() == common.Hash(req.(ReqCheckpointData)) {
s.chain.CheckpointInit(*checkpoint)
s.initialized = true
return
}
requester.Fail(event.Server, "invalid checkpoint data")
}
s.serverState[sid.Server] = serverState{state: ssNeedHeader}
case ssHeaderRequested:
if resp == nil {
s.serverState[sid.Server] = serverState{state: ssPrintStatus}
continue
}
newState := serverState{
hasHeader: true,
canonical: resp.(RespHeader).Canonical,
finalized: resp.(RespHeader).Finalized,
}
s.cpSlot, s.parentHash = resp.(RespHeader).Header.Slot, resp.(RespHeader).Header.ParentRoot
if s.cpSlot%params.EpochLength == 0 {
s.hasEpochInfo, s.epochBoundary = true, true
}
if s.hasEpochInfo {
newState.state = ssPrintStatus
} else {
newState.state = ssNeedParent
}
s.serverState[sid.Server] = newState
case ssParentRequested:
s.parentSlot = resp.(RespHeader).Header.Slot
s.hasEpochInfo, s.epochBoundary = true, s.cpSlot/params.EpochLength > s.parentSlot/params.EpochLength
newState := s.serverState[sid.Server]
newState.state = ssPrintStatus
s.serverState[sid.Server] = newState
}
case request.EvUnregistered:
delete(s.serverState, event.Server)
}
}
// start a request if possible
if s.initialized || s.locked != (request.ServerAndID{}) {
return
for _, server := range requester.CanSendTo() {
switch s.serverState[server].state {
case ssDefault:
if s.locked == (request.ServerAndID{}) {
id := requester.Send(server, ReqCheckpointData(s.checkpointHash))
s.locked = request.ServerAndID{Server: server, ID: id}
}
case ssNeedHeader:
requester.Send(server, ReqHeader(s.checkpointHash))
newState := s.serverState[server]
newState.state = ssHeaderRequested
s.serverState[server] = newState
case ssNeedParent:
requester.Send(server, ReqHeader(s.parentHash))
newState := s.serverState[server]
newState.state = ssParentRequested
s.serverState[server] = newState
}
}
cs := requester.CanSendTo()
if len(cs) == 0 {
return
// print log message if necessary
for server, state := range s.serverState {
if state.state != ssPrintStatus {
continue
}
switch {
case !state.hasHeader:
log.Error("blsync: checkpoint block is not available, reported as unknown", "server", server.Name())
case !state.canonical:
log.Error("blsync: checkpoint block is not available, reported as non-canonical", "server", server.Name())
case !s.hasEpochInfo:
// should be available if hasHeader is true and state is ssPrintStatus
panic("checkpoint epoch info not available when printing retrieval status")
case !s.epochBoundary:
log.Error("blsync: checkpoint block is not first of epoch", "slot", s.cpSlot, "parent", s.parentSlot, "server", server.Name())
case !state.finalized:
log.Error("blsync: checkpoint block is reported as non-finalized", "server", server.Name())
default:
log.Error("blsync: checkpoint not available, but reported as finalized; specified checkpoint hash might be too old", "server", server.Name())
}
s.serverState[server] = serverState{state: ssDone}
}
server := cs[0]
id := requester.Send(server, ReqCheckpointData(s.checkpointHash))
s.locked = request.ServerAndID{Server: server, ID: id}
}

// ForwardUpdateSync implements request.Module; it fetches updates between the
Expand Down
Loading