-
Notifications
You must be signed in to change notification settings - Fork 784
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Beacon api + validator electra #5744
Changes from 227 commits
3b7132b
e6c7f14
3a41e13
9b98f4e
7c6526d
19a9479
9f6de8e
38382a3
2c2e44c
e2e82ff
e0abede
f1f9f92
3c68841
1d5f755
32357d8
c40bec9
31955c2
75ab913
5728f78
8517236
90179d4
f30246b
7c0a8f8
43c3f63
721e73f
7abb762
42a4993
ca2a946
1ddd078
3ef7c90
683de56
dd5c9a8
5e1d5ff
c8fca4f
c20fc48
7cb7653
e32dfcd
3ea3d22
07229b7
36a559e
6fe919a
c575cd6
cb8c8f5
c30f709
ab9e58a
ca09671
b807d39
19f8333
fae4a2b
f9d4a28
411fcee
6477eec
e448557
aa83e8b
9b5ea9d
e494b41
d505c04
437e851
16265ef
72548cb
08e0458
ba02ffc
6d2c396
be9c4bb
7926afe
89e4de9
f60eac6
677a94d
9bd430b
4b28872
e1dcfb6
b819d2d
40c4c00
5364ba5
9a22eb8
a97e86c
261551e
518a91a
1ab786a
a75257f
f4907ef
75f22ee
28cf796
fc2c942
b8dc628
3b1fb0a
af7ba6f
aaf8e50
c900a88
97e88dd
67ba04e
179324b
217fa9f
793764f
c53d4ac
5f73d31
812b3d7
c680164
7f54906
8468937
8506fb0
ec055f4
4f0ecf2
227aa4b
fc15736
d8941d7
0c29896
210ad2f
79a5f25
a8088f1
8e537d1
bafb5f0
f9c50bc
987abe0
82858bc
154b7a7
bb734af
469296b
3f169ef
3e10e68
9440c36
57b6a9a
1aa410c
36a7b12
aed25c4
72abfa4
75432e1
e340998
b61d244
49de63f
29ed1c5
a647a36
4013944
7d3a5df
7a408b7
f9d3545
77c630b
f25531d
772ab53
49db91b
c2c2baf
b21b108
8dc9f38
c43d1c2
f57fa87
a5ee0ed
35e07eb
d7f3c95
d5aa2d8
c4f2284
3ac3ddb
9a01b6b
d87541c
dd0d5e2
3ec21a2
795eff9
960f8c5
1d0e3f4
5acc052
4f08f6e
f049285
5070ab2
45d007a
9e84779
7af3f2e
2634a1f
dec7cff
6a4d842
6f0b784
444cd62
d264736
7521f97
4d3edfe
7fce143
4d4c268
370d511
cbb7c5d
70a2d4d
9e6e76f
a8d8989
d67270f
3977b92
6e44832
afb9122
381bbab
0e2add2
f85a124
efb8a01
dd0aa8e
536c9f8
c276af6
af98e98
09f48c5
68035eb
27ed90e
b6913ae
ebbb17b
13b1b05
339d1b8
70a80d5
7509cf6
8715589
09141ec
8fc5333
5517c78
cf030d0
68fd7a7
d137881
87fde51
a8d84d6
51a8c80
4a858b3
897f06a
806a5eb
033457c
257bcc3
c9fe10b
69ac342
6766f32
39d41ad
f405601
0c2ee92
dabb3d1
80266a8
d1357e4
d394746
c4cb8ad
386aacd
71a2ead
4065ef6
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -31,6 +31,7 @@ mod validators; | |
mod version; | ||
|
||
use crate::produce_block::{produce_blinded_block_v2, produce_block_v2, produce_block_v3}; | ||
use crate::version::fork_versioned_response; | ||
use beacon_chain::{ | ||
attestation_verification::VerifiedAttestation, observed_operations::ObservationOutcome, | ||
validator_monitor::timestamp_now, AttestationError as AttnError, BeaconChain, BeaconChainError, | ||
|
@@ -256,12 +257,15 @@ pub fn prometheus_metrics() -> warp::filters::log::Log<impl Fn(warp::filters::lo | |
.or_else(|| starts_with("v1/validator/duties/sync")) | ||
.or_else(|| starts_with("v1/validator/attestation_data")) | ||
.or_else(|| starts_with("v1/validator/aggregate_attestation")) | ||
.or_else(|| starts_with("v2/validator/aggregate_attestation")) | ||
.or_else(|| starts_with("v1/validator/aggregate_and_proofs")) | ||
.or_else(|| starts_with("v2/validator/aggregate_and_proofs")) | ||
.or_else(|| starts_with("v1/validator/sync_committee_contribution")) | ||
.or_else(|| starts_with("v1/validator/contribution_and_proofs")) | ||
.or_else(|| starts_with("v1/validator/beacon_committee_subscriptions")) | ||
.or_else(|| starts_with("v1/validator/sync_committee_subscriptions")) | ||
.or_else(|| starts_with("v1/beacon/pool/attestations")) | ||
.or_else(|| starts_with("v2/beacon/pool/attestations")) | ||
.or_else(|| starts_with("v1/beacon/pool/sync_committees")) | ||
.or_else(|| starts_with("v1/beacon/blocks/head/root")) | ||
.or_else(|| starts_with("v1/validator/prepare_beacon_proposer")) | ||
|
@@ -1623,26 +1627,38 @@ pub fn serve<T: BeaconChainTypes>( | |
); | ||
|
||
// GET beacon/blocks/{block_id}/attestations | ||
let get_beacon_block_attestations = beacon_blocks_path_v1 | ||
let get_beacon_block_attestations = beacon_blocks_path_any | ||
.clone() | ||
.and(warp::path("attestations")) | ||
.and(warp::path::end()) | ||
.then( | ||
|block_id: BlockId, | ||
|endpoint_version: EndpointVersion, | ||
block_id: BlockId, | ||
task_spawner: TaskSpawner<T::EthSpec>, | ||
chain: Arc<BeaconChain<T>>| { | ||
task_spawner.blocking_json_task(Priority::P1, move || { | ||
task_spawner.blocking_response_task(Priority::P1, move || { | ||
let (block, execution_optimistic, finalized) = | ||
block_id.blinded_block(&chain)?; | ||
Ok(api_types::GenericResponse::from( | ||
block | ||
.message() | ||
.body() | ||
.attestations() | ||
.map(|att| att.clone_as_attestation()) | ||
.collect::<Vec<_>>(), | ||
) | ||
.add_execution_optimistic_finalized(execution_optimistic, finalized)) | ||
let fork_name = block | ||
.fork_name(&chain.spec) | ||
.map_err(inconsistent_fork_rejection)?; | ||
let atts = block | ||
.message() | ||
.body() | ||
.attestations() | ||
.map(|att| att.clone_as_attestation()) | ||
.collect::<Vec<_>>(); | ||
let res = execution_optimistic_finalized_fork_versioned_response( | ||
endpoint_version, | ||
fork_name, | ||
execution_optimistic, | ||
finalized, | ||
&atts, | ||
)?; | ||
Ok(add_consensus_version_header( | ||
warp::reply::json(&res).into_response(), | ||
fork_name, | ||
)) | ||
}) | ||
}, | ||
); | ||
|
@@ -1750,8 +1766,14 @@ pub fn serve<T: BeaconChainTypes>( | |
.and(task_spawner_filter.clone()) | ||
.and(chain_filter.clone()); | ||
|
||
let beacon_pool_path_any = any_version | ||
.and(warp::path("beacon")) | ||
.and(warp::path("pool")) | ||
.and(task_spawner_filter.clone()) | ||
.and(chain_filter.clone()); | ||
|
||
// POST beacon/pool/attestations | ||
let post_beacon_pool_attestations = beacon_pool_path | ||
let post_beacon_pool_attestations = beacon_pool_path_any | ||
.clone() | ||
.and(warp::path("attestations")) | ||
.and(warp::path::end()) | ||
|
@@ -1760,7 +1782,11 @@ pub fn serve<T: BeaconChainTypes>( | |
.and(reprocess_send_filter) | ||
.and(log_filter.clone()) | ||
.then( | ||
|task_spawner: TaskSpawner<T::EthSpec>, | ||
// V1 and V2 are identical except V2 has a consensus version header in the request. | ||
// We only require this header for SSZ deserialization, which isn't supported for | ||
// this endpoint presently. | ||
|_endpoint_version: EndpointVersion, | ||
task_spawner: TaskSpawner<T::EthSpec>, | ||
chain: Arc<BeaconChain<T>>, | ||
attestations: Vec<Attestation<T::EthSpec>>, | ||
network_tx: UnboundedSender<NetworkMessage<T::EthSpec>>, | ||
|
@@ -1781,16 +1807,17 @@ pub fn serve<T: BeaconChainTypes>( | |
); | ||
|
||
// GET beacon/pool/attestations?committee_index,slot | ||
let get_beacon_pool_attestations = beacon_pool_path | ||
let get_beacon_pool_attestations = beacon_pool_path_any | ||
.clone() | ||
.and(warp::path("attestations")) | ||
.and(warp::path::end()) | ||
.and(warp::query::<api_types::AttestationPoolQuery>()) | ||
.then( | ||
|task_spawner: TaskSpawner<T::EthSpec>, | ||
|endpoint_version: EndpointVersion, | ||
task_spawner: TaskSpawner<T::EthSpec>, | ||
chain: Arc<BeaconChain<T>>, | ||
query: api_types::AttestationPoolQuery| { | ||
task_spawner.blocking_json_task(Priority::P1, move || { | ||
task_spawner.blocking_response_task(Priority::P1, move || { | ||
let query_filter = |data: &AttestationData| { | ||
query.slot.map_or(true, |slot| slot == data.slot) | ||
&& query | ||
|
@@ -1807,20 +1834,40 @@ pub fn serve<T: BeaconChainTypes>( | |
.filter(|&att| query_filter(att.data())) | ||
.cloned(), | ||
); | ||
Ok(api_types::GenericResponse::from(attestations)) | ||
let slot = query | ||
.slot | ||
.or_else(|| { | ||
attestations | ||
.first() | ||
.map(|att| att.data().slot) | ||
.or_else(|| chain.slot_clock.now()) | ||
}) | ||
.ok_or(warp_utils::reject::custom_server_error( | ||
"unable to read slot clock".to_string(), | ||
))?; | ||
let fork_name = chain.spec.fork_name_at_slot::<T::EthSpec>(slot); | ||
let res = fork_versioned_response(endpoint_version, fork_name, &attestations)?; | ||
Ok(add_consensus_version_header( | ||
warp::reply::json(&res).into_response(), | ||
fork_name, | ||
)) | ||
}) | ||
}, | ||
); | ||
|
||
// POST beacon/pool/attester_slashings | ||
let post_beacon_pool_attester_slashings = beacon_pool_path | ||
let post_beacon_pool_attester_slashings = beacon_pool_path_any | ||
.clone() | ||
.and(warp::path("attester_slashings")) | ||
.and(warp::path::end()) | ||
.and(warp_utils::json::json()) | ||
.and(network_tx_filter.clone()) | ||
.then( | ||
|task_spawner: TaskSpawner<T::EthSpec>, | ||
// V1 and V2 are identical except V2 has a consensus version header in the request. | ||
// We only require this header for SSZ deserialization, which isn't supported for | ||
// this endpoint presently. | ||
|_endpoint_version: EndpointVersion, | ||
task_spawner: TaskSpawner<T::EthSpec>, | ||
chain: Arc<BeaconChain<T>>, | ||
slashing: AttesterSlashing<T::EthSpec>, | ||
network_tx: UnboundedSender<NetworkMessage<T::EthSpec>>| { | ||
|
@@ -1857,15 +1904,29 @@ pub fn serve<T: BeaconChainTypes>( | |
); | ||
|
||
// GET beacon/pool/attester_slashings | ||
let get_beacon_pool_attester_slashings = beacon_pool_path | ||
let get_beacon_pool_attester_slashings = beacon_pool_path_any | ||
.clone() | ||
.and(warp::path("attester_slashings")) | ||
.and(warp::path::end()) | ||
.then( | ||
|task_spawner: TaskSpawner<T::EthSpec>, chain: Arc<BeaconChain<T>>| { | ||
task_spawner.blocking_json_task(Priority::P1, move || { | ||
let attestations = chain.op_pool.get_all_attester_slashings(); | ||
Ok(api_types::GenericResponse::from(attestations)) | ||
|endpoint_version: EndpointVersion, | ||
task_spawner: TaskSpawner<T::EthSpec>, | ||
chain: Arc<BeaconChain<T>>| { | ||
task_spawner.blocking_response_task(Priority::P1, move || { | ||
let slashings = chain.op_pool.get_all_attester_slashings(); | ||
let slot = slashings | ||
.first() | ||
.map(|slashing| slashing.attestation_1().data().slot) | ||
.or_else(|| chain.slot_clock.now()) | ||
.ok_or(warp_utils::reject::custom_server_error( | ||
"unable to read slot clock".to_string(), | ||
))?; | ||
let fork_name = chain.spec.fork_name_at_slot::<T::EthSpec>(slot); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Not a reliable way to get the attester slashing fork. One can have an attester slashing electra with an attestation data from deneb. Instead we should implement There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why aren't we always calculating the fork based on There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Good catch
I think this makes sense considering return types could be mixed for a given There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. raised a spec issue as well ethereum/beacon-APIs#458 |
||
let res = fork_versioned_response(endpoint_version, fork_name, &slashings)?; | ||
Ok(add_consensus_version_header( | ||
warp::reply::json(&res).into_response(), | ||
fork_name, | ||
)) | ||
}) | ||
}, | ||
); | ||
|
@@ -3175,7 +3236,7 @@ pub fn serve<T: BeaconChainTypes>( | |
); | ||
|
||
// GET validator/aggregate_attestation?attestation_data_root,slot | ||
let get_validator_aggregate_attestation = eth_v1 | ||
let get_validator_aggregate_attestation = any_version | ||
.and(warp::path("validator")) | ||
.and(warp::path("aggregate_attestation")) | ||
.and(warp::path::end()) | ||
|
@@ -3184,29 +3245,45 @@ pub fn serve<T: BeaconChainTypes>( | |
.and(task_spawner_filter.clone()) | ||
.and(chain_filter.clone()) | ||
.then( | ||
|query: api_types::ValidatorAggregateAttestationQuery, | ||
|endpoint_version: EndpointVersion, | ||
query: api_types::ValidatorAggregateAttestationQuery, | ||
not_synced_filter: Result<(), Rejection>, | ||
task_spawner: TaskSpawner<T::EthSpec>, | ||
chain: Arc<BeaconChain<T>>| { | ||
task_spawner.blocking_json_task(Priority::P0, move || { | ||
not_synced_filter?; | ||
chain | ||
.get_pre_electra_aggregated_attestation_by_slot_and_root( | ||
let res = if endpoint_version == V2 { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. should There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||
let Some(committee_index) = query.committee_index else { | ||
return Err(warp_utils::reject::custom_bad_request( | ||
"missing committee index".to_string(), | ||
)); | ||
}; | ||
chain.get_aggregated_attestation_electra( | ||
query.slot, | ||
&query.attestation_data_root, | ||
committee_index, | ||
) | ||
.map_err(|e| { | ||
warp_utils::reject::custom_bad_request(format!( | ||
"unable to fetch aggregate: {:?}", | ||
e | ||
)) | ||
})? | ||
.map(api_types::GenericResponse::from) | ||
.ok_or_else(|| { | ||
warp_utils::reject::custom_not_found( | ||
"no matching aggregate found".to_string(), | ||
) | ||
}) | ||
} else if endpoint_version == V1 { | ||
// Do nothing | ||
chain.get_pre_electra_aggregated_attestation_by_slot_and_root( | ||
query.slot, | ||
&query.attestation_data_root, | ||
) | ||
} else { | ||
return Err(unsupported_version_rejection(endpoint_version)); | ||
}; | ||
res.map_err(|e| { | ||
warp_utils::reject::custom_bad_request(format!( | ||
"unable to fetch aggregate: {:?}", | ||
e | ||
)) | ||
})? | ||
.map(api_types::GenericResponse::from) | ||
.ok_or_else(|| { | ||
warp_utils::reject::custom_not_found( | ||
"no matching aggregate found".to_string(), | ||
) | ||
}) | ||
}) | ||
}, | ||
); | ||
|
@@ -3302,7 +3379,7 @@ pub fn serve<T: BeaconChainTypes>( | |
); | ||
|
||
// POST validator/aggregate_and_proofs | ||
let post_validator_aggregate_and_proofs = eth_v1 | ||
let post_validator_aggregate_and_proofs = any_version | ||
.and(warp::path("validator")) | ||
.and(warp::path("aggregate_and_proofs")) | ||
.and(warp::path::end()) | ||
|
@@ -3313,7 +3390,11 @@ pub fn serve<T: BeaconChainTypes>( | |
.and(network_tx_filter.clone()) | ||
.and(log_filter.clone()) | ||
.then( | ||
|not_synced_filter: Result<(), Rejection>, | ||
// V1 and V2 are identical except V2 has a consensus version header in the request. | ||
// We only require this header for SSZ deserialization, which isn't supported for | ||
// this endpoint presently. | ||
|_endpoint_version: EndpointVersion, | ||
not_synced_filter: Result<(), Rejection>, | ||
task_spawner: TaskSpawner<T::EthSpec>, | ||
chain: Arc<BeaconChain<T>>, | ||
aggregates: Vec<SignedAggregateAndProof<T::EthSpec>>, | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What if someone queries the pool at the fork boundary? There may be attestations from both forks
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
that's a great point.. looks like we'll have to convert to the new format for all of these attestations
Seems to mean Pawan's idea is the best way to approach it: #5744 (comment)