From 64a333a0707547589efa8356f7b69cd6c13dbdc9 Mon Sep 17 00:00:00 2001 From: Zsolt Felfoldi Date: Thu, 11 Apr 2024 16:27:51 +0200 Subject: [PATCH 1/4] beacon/light/api: include canonical and finalized flags in header response --- beacon/light/api/api_server.go | 4 +++- beacon/light/api/light_api.go | 17 ++++++++++------- beacon/light/sync/types.go | 6 +++++- 3 files changed, 18 insertions(+), 9 deletions(-) diff --git a/beacon/light/api/api_server.go b/beacon/light/api/api_server.go index da044f4b2d6e..76e2b71c985f 100755 --- a/beacon/light/api/api_server.go +++ b/beacon/light/api/api_server.go @@ -73,8 +73,10 @@ func (s *ApiServer) SendRequest(id request.ID, req request.Request) { r.Updates, r.Committees, err = s.api.GetBestUpdatesAndCommittees(data.FirstPeriod, data.Count) resp = r case sync.ReqHeader: + var r sync.RespHeader log.Debug("Beacon API: requesting header", "reqid", id, "hash", common.Hash(data)) - resp, err = s.api.GetHeader(common.Hash(data)) + r.Header, r.Canonical, r.Finalized, err = s.api.GetHeader(common.Hash(data)) + resp = r case sync.ReqCheckpointData: log.Debug("Beacon API: requesting checkpoint data", "reqid", id, "hash", common.Hash(data)) resp, err = s.api.GetCheckpointData(common.Hash(data)) diff --git a/beacon/light/api/light_api.go b/beacon/light/api/light_api.go index ceb4261c3c9c..6892407cafd7 100755 --- a/beacon/light/api/light_api.go +++ b/beacon/light/api/light_api.go @@ -291,7 +291,9 @@ func decodeFinalityUpdate(enc []byte) (types.FinalityUpdate, error) { // GetHeader fetches and validates the beacon header with the given blockRoot. // If blockRoot is null hash then the latest head header is fetched. -func (api *BeaconLightApi) GetHeader(blockRoot common.Hash) (types.Header, error) { +// The values of the canonical and finalized flags are also returned. Note that +// these flags are not validated. +func (api *BeaconLightApi) GetHeader(blockRoot common.Hash) (types.Header, bool, bool, error) { var blockId string if blockRoot == (common.Hash{}) { blockId = "head" @@ -300,11 +302,12 @@ func (api *BeaconLightApi) GetHeader(blockRoot common.Hash) (types.Header, error } resp, err := api.httpGetf("/eth/v1/beacon/headers/%s", blockId) if err != nil { - return types.Header{}, err + return types.Header{}, false, false, err } var data struct { - Data struct { + Finalized bool `json:"finalized"` + Data struct { Root common.Hash `json:"root"` Canonical bool `json:"canonical"` Header struct { @@ -314,16 +317,16 @@ func (api *BeaconLightApi) GetHeader(blockRoot common.Hash) (types.Header, error } `json:"data"` } if err := json.Unmarshal(resp, &data); err != nil { - return types.Header{}, err + return types.Header{}, false, false, err } header := data.Data.Header.Message if blockRoot == (common.Hash{}) { blockRoot = data.Data.Root } if header.Hash() != blockRoot { - return types.Header{}, errors.New("retrieved beacon header root does not match") + return types.Header{}, false, false, errors.New("retrieved beacon header root does not match") } - return header, nil + return header, data.Data.Canonical, data.Finalized, nil } // GetCheckpointData fetches and validates bootstrap data belonging to the given checkpoint. @@ -446,7 +449,7 @@ func (api *BeaconLightApi) StartHeadListener(listener HeadEventListener) func() defer wg.Done() // Request initial data. - if head, err := api.GetHeader(common.Hash{}); err == nil { + if head, _, _, err := api.GetHeader(common.Hash{}); err == nil { listener.OnNewHead(head.Slot, head.Hash()) } if signedHead, err := api.GetOptimisticHeadUpdate(); err == nil { diff --git a/beacon/light/sync/types.go b/beacon/light/sync/types.go index 6449ae842d00..8aa4c95f46ea 100644 --- a/beacon/light/sync/types.go +++ b/beacon/light/sync/types.go @@ -36,7 +36,11 @@ type ( Updates []*types.LightClientUpdate Committees []*types.SerializedSyncCommittee } - ReqHeader common.Hash + ReqHeader common.Hash + RespHeader struct { + Header types.Header + Canonical, Finalized bool + } ReqCheckpointData common.Hash ReqBeaconBlock common.Hash ) From 1f464ec471167bf5f8cde6508b991133031422f1 Mon Sep 17 00:00:00 2001 From: Zsolt Felfoldi Date: Mon, 15 Apr 2024 09:06:33 +0200 Subject: [PATCH 2/4] beacon/light/sync: print error log if checkpoint retrieval fails --- beacon/blsync/block_sync_test.go | 10 +- beacon/light/api/api_server.go | 5 + beacon/light/request/scheduler.go | 4 +- beacon/light/request/scheduler_test.go | 4 + beacon/light/request/server.go | 7 ++ beacon/light/request/server_test.go | 1 + beacon/light/sync/head_sync_test.go | 14 ++- beacon/light/sync/test_helpers.go | 4 +- beacon/light/sync/update_sync.go | 138 +++++++++++++++++++++---- 9 files changed, 156 insertions(+), 31 deletions(-) diff --git a/beacon/blsync/block_sync_test.go b/beacon/blsync/block_sync_test.go index 73ae89ae734f..0525e95a8932 100644 --- a/beacon/blsync/block_sync_test.go +++ b/beacon/blsync/block_sync_test.go @@ -28,8 +28,8 @@ import ( ) var ( - testServer1 = "testServer1" - testServer2 = "testServer2" + testServer1 = testServer("testServer1") + testServer2 = testServer("testServer2") testBlock1 = types.NewBeaconBlock(&deneb.BeaconBlock{ Slot: 123, @@ -51,6 +51,12 @@ var ( }) ) +type testServer string + +func (t testServer) Name() string { + return string(t) +} + func TestBlockSync(t *testing.T) { ht := &testHeadTracker{} blockSync := newBeaconBlockSync(ht) diff --git a/beacon/light/api/api_server.go b/beacon/light/api/api_server.go index 76e2b71c985f..4b885cb8e101 100755 --- a/beacon/light/api/api_server.go +++ b/beacon/light/api/api_server.go @@ -103,3 +103,8 @@ func (s *ApiServer) Unsubscribe() { s.unsubscribe = nil } } + +// Name implements request.Server +func (s *ApiServer) Name() string { + return s.api.url +} diff --git a/beacon/light/request/scheduler.go b/beacon/light/request/scheduler.go index 4b8f6ce5703a..e80daf805e86 100644 --- a/beacon/light/request/scheduler.go +++ b/beacon/light/request/scheduler.go @@ -93,7 +93,9 @@ type ( // the modules that do not interact with them directly. // In order to make module testing easier, Server interface is used in // events and modules. - Server any + Server interface { + Name() string + } Request any Response any ID uint64 diff --git a/beacon/light/request/scheduler_test.go b/beacon/light/request/scheduler_test.go index 7d5a56707864..5cd4965644b4 100644 --- a/beacon/light/request/scheduler_test.go +++ b/beacon/light/request/scheduler_test.go @@ -70,6 +70,10 @@ type testServer struct { canRequest int } +func (s *testServer) Name() string { + return "" +} + func (s *testServer) subscribe(eventCb func(Event)) { s.eventCb = eventCb } diff --git a/beacon/light/request/server.go b/beacon/light/request/server.go index bcb8744b38a4..9f3b09b81e80 100644 --- a/beacon/light/request/server.go +++ b/beacon/light/request/server.go @@ -58,6 +58,7 @@ const ( // EvResponse or EvFail. Additionally, it may also send application-defined // events that the Modules can interpret. type requestServer interface { + Name() string Subscribe(eventCallback func(Event)) SendRequest(ID, Request) Unsubscribe() @@ -69,6 +70,7 @@ type requestServer interface { // limit the number of parallel in-flight requests and temporarily disable // new requests based on timeouts and response failures. type server interface { + Server subscribe(eventCallback func(Event)) canRequestNow() bool sendRequest(Request) ID @@ -138,6 +140,11 @@ type serverWithTimeout struct { lastID ID } +// Name implements request.Server +func (s *serverWithTimeout) Name() string { + return s.parent.Name() +} + // init initializes serverWithTimeout func (s *serverWithTimeout) init(clock mclock.Clock) { s.clock = clock diff --git a/beacon/light/request/server_test.go b/beacon/light/request/server_test.go index b6b9edf9a056..38629cb8c464 100644 --- a/beacon/light/request/server_test.go +++ b/beacon/light/request/server_test.go @@ -153,6 +153,7 @@ type testRequestServer struct { eventCb func(Event) } +func (rs *testRequestServer) Name() string { return "" } func (rs *testRequestServer) Subscribe(eventCb func(Event)) { rs.eventCb = eventCb } func (rs *testRequestServer) SendRequest(ID, Request) {} func (rs *testRequestServer) Unsubscribe() {} diff --git a/beacon/light/sync/head_sync_test.go b/beacon/light/sync/head_sync_test.go index 2f75487f161c..a2870b2732b2 100644 --- a/beacon/light/sync/head_sync_test.go +++ b/beacon/light/sync/head_sync_test.go @@ -24,10 +24,10 @@ import ( ) var ( - testServer1 = "testServer1" - testServer2 = "testServer2" - testServer3 = "testServer3" - testServer4 = "testServer4" + testServer1 = testServer("testServer1") + testServer2 = testServer("testServer2") + testServer3 = testServer("testServer3") + testServer4 = testServer("testServer4") testHead0 = types.HeadInfo{} testHead1 = types.HeadInfo{Slot: 123, BlockRoot: common.Hash{1}} @@ -42,6 +42,12 @@ var ( testSHead4 = types.SignedHeader{SignatureSlot: 0x6444, Header: types.Header{Slot: 0x6443, StateRoot: common.Hash{4}}} ) +type testServer string + +func (t testServer) Name() string { + return string(t) +} + func TestValidatedHead(t *testing.T) { chain := &TestCommitteeChain{} ht := &TestHeadTracker{} diff --git a/beacon/light/sync/test_helpers.go b/beacon/light/sync/test_helpers.go index a1ca2b590993..9f57ceebe4d8 100644 --- a/beacon/light/sync/test_helpers.go +++ b/beacon/light/sync/test_helpers.go @@ -75,7 +75,7 @@ func (ts *TestScheduler) Run(testIndex int, exp ...any) { if count == 0 { continue } - ts.t.Errorf("Missing %d Server.Fail(s) from server %s in test case #%d", count, server.(string), testIndex) + ts.t.Errorf("Missing %d Server.Fail(s) from server %s in test case #%d", count, server.Name(), testIndex) } if !reflect.DeepEqual(ts.sent[testIndex], expReqs) { @@ -104,7 +104,7 @@ func (ts *TestScheduler) Send(server request.Server, req request.Request) reques func (ts *TestScheduler) Fail(server request.Server, desc string) { if ts.expFail[server] == 0 { - ts.t.Errorf("Unexpected Fail from server %s in test case #%d: %s", server.(string), ts.testIndex, desc) + ts.t.Errorf("Unexpected Fail from server %s in test case #%d: %s", server.Name(), ts.testIndex, desc) return } ts.expFail[server]-- diff --git a/beacon/light/sync/update_sync.go b/beacon/light/sync/update_sync.go index 533e470fb022..c986d3cf3cda 100644 --- a/beacon/light/sync/update_sync.go +++ b/beacon/light/sync/update_sync.go @@ -21,6 +21,7 @@ import ( "github.com/ethereum/go-ethereum/beacon/light" "github.com/ethereum/go-ethereum/beacon/light/request" + "github.com/ethereum/go-ethereum/beacon/params" "github.com/ethereum/go-ethereum/beacon/types" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/log" @@ -42,6 +43,31 @@ type CheckpointInit struct { checkpointHash common.Hash locked request.ServerAndID initialized bool + // per-server state is used to track the state of requesting checkpoint header + // info. Part of this info (canonical and finalized state) is not validated + // and therefore it is requested from each server separately after it has + // reported a missing checkpoint (which is also not validated info). + serverState map[request.Server]serverState + // the following fields are used to determine whether the checkpoint is on + // epoch boundary. This information is validated and therefore stored globally. + parentHash common.Hash + hasEpochInfo, epochBoundary bool + cpSlot, parentSlot uint64 +} + +const ( + ssDefault = iota // no action yet or checkpoint requested + ssNeedHeader // checkpoint req failed, need cp header + ssHeaderRequested // cp header requested + ssNeedParent // cp header slot %32 != 0, need parent to check epoch boundary + ssParentRequested // cp parent header requested + ssPrintStatus // has all necessary info, print log message if init still not successful + ssDone // log message printed, no more action required +) + +type serverState struct { + state int + hasHeader, canonical, finalized bool // stored per server because not validated } // NewCheckpointInit creates a new CheckpointInit. @@ -49,40 +75,108 @@ func NewCheckpointInit(chain committeeChain, checkpointHash common.Hash) *Checkp return &CheckpointInit{ chain: chain, checkpointHash: checkpointHash, + serverState: make(map[request.Server]serverState), } } // Process implements request.Module. func (s *CheckpointInit) Process(requester request.Requester, events []request.Event) { + if s.initialized { + return + } for _, event := range events { - if !event.IsRequestEvent() { - continue - } - sid, req, resp := event.RequestInfo() - if s.locked == sid { - s.locked = request.ServerAndID{} - } - if resp != nil { - if checkpoint := resp.(*types.BootstrapData); checkpoint.Header.Hash() == common.Hash(req.(ReqCheckpointData)) { - s.chain.CheckpointInit(*checkpoint) - s.initialized = true - return + switch event.Type { + case request.EvResponse, request.EvFail, request.EvTimeout: + sid, req, resp := event.RequestInfo() + if s.locked == sid { + s.locked = request.ServerAndID{} } - - requester.Fail(event.Server, "invalid checkpoint data") + if event.Type == request.EvTimeout { + continue + } + switch s.serverState[sid.Server].state { + case ssDefault: + if resp != nil { + if checkpoint := resp.(*types.BootstrapData); checkpoint.Header.Hash() == common.Hash(req.(ReqCheckpointData)) { + s.chain.CheckpointInit(*checkpoint) + s.initialized = true + return + } + requester.Fail(event.Server, "invalid checkpoint data") + } + s.serverState[sid.Server] = serverState{state: ssNeedHeader} + case ssHeaderRequested: + if resp == nil { + s.serverState[sid.Server] = serverState{state: ssPrintStatus} + continue + } + newState := serverState{ + hasHeader: true, + canonical: resp.(RespHeader).Canonical, + finalized: resp.(RespHeader).Finalized, + } + s.cpSlot, s.parentHash = resp.(RespHeader).Header.Slot, resp.(RespHeader).Header.ParentRoot + if s.cpSlot%params.EpochLength == 0 { + s.hasEpochInfo, s.epochBoundary = true, true + } + if s.hasEpochInfo { + newState.state = ssPrintStatus + } else { + newState.state = ssNeedParent + } + s.serverState[sid.Server] = newState + case ssParentRequested: + s.parentSlot = resp.(RespHeader).Header.Slot + s.hasEpochInfo, s.epochBoundary = true, s.cpSlot/params.EpochLength > s.parentSlot/params.EpochLength + newState := s.serverState[sid.Server] + newState.state = ssPrintStatus + s.serverState[sid.Server] = newState + } + case request.EvUnregistered: + delete(s.serverState, event.Server) } } // start a request if possible - if s.initialized || s.locked != (request.ServerAndID{}) { - return + for _, server := range requester.CanSendTo() { + switch s.serverState[server].state { + case ssDefault: + if s.locked == (request.ServerAndID{}) { + id := requester.Send(server, ReqCheckpointData(s.checkpointHash)) + s.locked = request.ServerAndID{Server: server, ID: id} + } + case ssNeedHeader: + requester.Send(server, ReqHeader(s.checkpointHash)) + newState := s.serverState[server] + newState.state = ssHeaderRequested + s.serverState[server] = newState + case ssNeedParent: + requester.Send(server, ReqHeader(s.parentHash)) + newState := s.serverState[server] + newState.state = ssParentRequested + s.serverState[server] = newState + } } - cs := requester.CanSendTo() - if len(cs) == 0 { - return + // print log message if necessary + for server, state := range s.serverState { + if state.state != ssPrintStatus { + continue + } + switch { + case !state.hasHeader: + log.Error("Checkpoint not available, block is reported as unknown", "server", server.Name()) + case !state.canonical: + log.Error("Checkpoint not available, block is reported as non-canonical", "server", server.Name()) + case !s.hasEpochInfo: + panic(nil) // should be available if hasHeader is true and state is ssPrintStatus + case !s.epochBoundary: + log.Error("Checkpoint not available, block is not on epoch boundary", "slot", s.cpSlot, "parent slot", s.parentSlot, "server", server.Name()) + case !state.finalized: + log.Error("Checkpoint not available, block is reported as non-finalized", "server", server.Name()) + default: + log.Error("Checkpoint not available, block is reported as canonical and finalized; specified checkpoint might be too old", "server", server.Name()) + } + s.serverState[server] = serverState{state: ssDone} } - server := cs[0] - id := requester.Send(server, ReqCheckpointData(s.checkpointHash)) - s.locked = request.ServerAndID{Server: server, ID: id} } // ForwardUpdateSync implements request.Module; it fetches updates between the From 7f75da46ba552a4cdcaff818fccaf3f6fe694afc Mon Sep 17 00:00:00 2001 From: Zsolt Felfoldi Date: Wed, 17 Apr 2024 10:17:25 +0200 Subject: [PATCH 3/4] beacon/light/sync: add panic message --- beacon/light/sync/update_sync.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/beacon/light/sync/update_sync.go b/beacon/light/sync/update_sync.go index c986d3cf3cda..d4c99db903f0 100644 --- a/beacon/light/sync/update_sync.go +++ b/beacon/light/sync/update_sync.go @@ -167,7 +167,8 @@ func (s *CheckpointInit) Process(requester request.Requester, events []request.E case !state.canonical: log.Error("Checkpoint not available, block is reported as non-canonical", "server", server.Name()) case !s.hasEpochInfo: - panic(nil) // should be available if hasHeader is true and state is ssPrintStatus + // should be available if hasHeader is true and state is ssPrintStatus + panic("Checkpoint epoch info not available when printing retrieval status") case !s.epochBoundary: log.Error("Checkpoint not available, block is not on epoch boundary", "slot", s.cpSlot, "parent slot", s.parentSlot, "server", server.Name()) case !state.finalized: From f5e8ab4d311bd0f475383c5f03619a324e7a9898 Mon Sep 17 00:00:00 2001 From: Felix Lange Date: Mon, 22 Apr 2024 11:52:23 +0200 Subject: [PATCH 4/4] beacon/light: update error messages --- beacon/light/sync/update_sync.go | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/beacon/light/sync/update_sync.go b/beacon/light/sync/update_sync.go index d4c99db903f0..71801b1b600f 100644 --- a/beacon/light/sync/update_sync.go +++ b/beacon/light/sync/update_sync.go @@ -163,18 +163,18 @@ func (s *CheckpointInit) Process(requester request.Requester, events []request.E } switch { case !state.hasHeader: - log.Error("Checkpoint not available, block is reported as unknown", "server", server.Name()) + log.Error("blsync: checkpoint block is not available, reported as unknown", "server", server.Name()) case !state.canonical: - log.Error("Checkpoint not available, block is reported as non-canonical", "server", server.Name()) + log.Error("blsync: checkpoint block is not available, reported as non-canonical", "server", server.Name()) case !s.hasEpochInfo: // should be available if hasHeader is true and state is ssPrintStatus - panic("Checkpoint epoch info not available when printing retrieval status") + panic("checkpoint epoch info not available when printing retrieval status") case !s.epochBoundary: - log.Error("Checkpoint not available, block is not on epoch boundary", "slot", s.cpSlot, "parent slot", s.parentSlot, "server", server.Name()) + log.Error("blsync: checkpoint block is not first of epoch", "slot", s.cpSlot, "parent", s.parentSlot, "server", server.Name()) case !state.finalized: - log.Error("Checkpoint not available, block is reported as non-finalized", "server", server.Name()) + log.Error("blsync: checkpoint block is reported as non-finalized", "server", server.Name()) default: - log.Error("Checkpoint not available, block is reported as canonical and finalized; specified checkpoint might be too old", "server", server.Name()) + log.Error("blsync: checkpoint not available, but reported as finalized; specified checkpoint hash might be too old", "server", server.Name()) } s.serverState[server] = serverState{state: ssDone} }