Skip to content

Commit

Permalink
Prevent port re-use in HTTP API tests (sigp#4745)
Browse files Browse the repository at this point in the history
CI is plagued by `AddrAlreadyInUse` failures, which are caused by race conditions in allocating free ports.

This PR removes all usages of the `unused_port` crate for Lighthouse's HTTP API, in favour of passing `:0` as the listen address. As a result, the listen address isn't known ahead of time and must be read from the listening socket after it binds. This requires tying some self-referential knots, which is a little disruptive, but hopefully doesn't clash too much with Deneb 🤞

There are still a few usages of `unused_tcp4_port` left in cases where we start external processes, like the `watch` Postgres DB, Anvil, Geth, Nethermind, etc. Removing these usages is non-trivial because it's hard to read the port back from an external process after starting it with `--port 0`. We might be able to do something on Linux where we read from `/proc/`, but I'll leave that for future work.
  • Loading branch information
michaelsproul authored and Woodpile37 committed Jan 6, 2024
1 parent 57f6d5d commit 8eb0c38
Show file tree
Hide file tree
Showing 10 changed files with 113 additions and 131 deletions.
5 changes: 3 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 0 additions & 1 deletion beacon_node/beacon_chain/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,6 @@ sensitive_url = { path = "../../common/sensitive_url" }
superstruct = "0.5.0"
hex = "0.4.2"
exit-future = "0.2.0"
unused_port = {path = "../../common/unused_port"}
oneshot_broadcast = { path = "../../common/oneshot_broadcast" }
slog-term = "2.6.0"
slog-async = "2.5.0"
Expand Down
3 changes: 3 additions & 0 deletions beacon_node/execution_layer/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@ ethers-core = "1.0.2"
builder_client = { path = "../builder_client" }
fork_choice = { path = "../../consensus/fork_choice" }
mev-rs = { git = "https://github.com/ralexstokes/mev-rs", rev = "216657016d5c0889b505857c89ae42c7aa2764af" }
axum = "0.6"
hyper = "0.14"
ethereum-consensus = { git = "https://github.com/ralexstokes/ethereum-consensus", rev = "e380108" }
ssz_rs = "0.9.0"
tokio-stream = { version = "0.1.9", features = [ "sync" ] }
Expand All @@ -51,3 +53,4 @@ hash256-std-hasher = "0.15.2"
triehash = "0.8.4"
hash-db = "0.15.2"
pretty_reqwest_error = { path = "../../common/pretty_reqwest_error" }
arc-swap = "1.6.0"
59 changes: 35 additions & 24 deletions beacon_node/execution_layer/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
//! deposit-contract functionality that the `beacon_node/eth1` crate already provides.
use crate::payload_cache::PayloadCache;
use arc_swap::ArcSwapOption;
use auth::{strip_prefix, Auth, JwtKey};
use builder_client::BuilderHttpClient;
pub use engine_api::EngineCapabilities;
Expand Down Expand Up @@ -258,7 +259,7 @@ pub enum FailedCondition {

struct Inner<E: EthSpec> {
engine: Arc<Engine>,
builder: Option<BuilderHttpClient>,
builder: ArcSwapOption<BuilderHttpClient>,
execution_engine_forkchoice_lock: Mutex<()>,
suggested_fee_recipient: Option<Address>,
proposer_preparation_data: Mutex<HashMap<u64, ProposerPreparationDataEntry>>,
Expand Down Expand Up @@ -373,25 +374,9 @@ impl<T: EthSpec> ExecutionLayer<T> {
Engine::new(api, executor.clone(), &log)
};

let builder = builder_url
.map(|url| {
let builder_client = BuilderHttpClient::new(url.clone(), builder_user_agent)
.map_err(Error::Builder)?;

info!(
log,
"Using external block builder";
"builder_url" => ?url,
"builder_profit_threshold" => builder_profit_threshold,
"local_user_agent" => builder_client.get_user_agent(),
);
Ok::<_, Error>(builder_client)
})
.transpose()?;

let inner = Inner {
engine: Arc::new(engine),
builder,
builder: ArcSwapOption::empty(),
execution_engine_forkchoice_lock: <_>::default(),
suggested_fee_recipient,
proposer_preparation_data: Mutex::new(HashMap::new()),
Expand All @@ -405,19 +390,45 @@ impl<T: EthSpec> ExecutionLayer<T> {
last_new_payload_errored: RwLock::new(false),
};

Ok(Self {
let el = Self {
inner: Arc::new(inner),
})
};

if let Some(builder_url) = builder_url {
el.set_builder_url(builder_url, builder_user_agent)?;
}

Ok(el)
}
}

impl<T: EthSpec> ExecutionLayer<T> {
fn engine(&self) -> &Arc<Engine> {
&self.inner.engine
}

pub fn builder(&self) -> &Option<BuilderHttpClient> {
&self.inner.builder
pub fn builder(&self) -> Option<Arc<BuilderHttpClient>> {
self.inner.builder.load_full()
}

/// Set the builder URL after initialization.
///
/// This is useful for breaking circular dependencies between mock ELs and mock builders in
/// tests.
pub fn set_builder_url(
&self,
builder_url: SensitiveUrl,
builder_user_agent: Option<String>,
) -> Result<(), Error> {
let builder_client = BuilderHttpClient::new(builder_url.clone(), builder_user_agent)
.map_err(Error::Builder)?;
info!(
self.log(),
"Using external block builder";
"builder_url" => ?builder_url,
"builder_profit_threshold" => self.inner.builder_profit_threshold.as_u128(),
"local_user_agent" => builder_client.get_user_agent(),
);
self.inner.builder.swap(Some(Arc::new(builder_client)));
Ok(())
}

/// Cache a full payload, keyed on the `tree_hash_root` of the payload
Expand Down
61 changes: 23 additions & 38 deletions beacon_node/execution_layer/src/test_utils/mock_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,11 @@ use types::{
Uint256,
};

pub type MockBuilderServer = axum::Server<
hyper::server::conn::AddrIncoming,
axum::routing::IntoMakeService<axum::routing::Router>,
>;

#[derive(Clone)]
pub enum Operation {
FeeRecipient(Address),
Expand Down Expand Up @@ -166,19 +171,25 @@ impl BidStuff for BuilderBid {
}
}

pub struct TestingBuilder<E: EthSpec> {
server: BlindedBlockProviderServer<MockBuilder<E>>,
pub builder: MockBuilder<E>,
#[derive(Clone)]
pub struct MockBuilder<E: EthSpec> {
el: ExecutionLayer<E>,
beacon_client: BeaconNodeHttpClient,
spec: ChainSpec,
context: Arc<Context>,
val_registration_cache: Arc<RwLock<HashMap<BlsPublicKey, SignedValidatorRegistration>>>,
builder_sk: SecretKey,
operations: Arc<RwLock<Vec<Operation>>>,
invalidate_signatures: Arc<RwLock<bool>>,
}

impl<E: EthSpec> TestingBuilder<E> {
pub fn new(
impl<E: EthSpec> MockBuilder<E> {
pub fn new_for_testing(
mock_el_url: SensitiveUrl,
builder_url: SensitiveUrl,
beacon_url: SensitiveUrl,
spec: ChainSpec,
executor: TaskExecutor,
) -> Self {
) -> (Self, MockBuilderServer) {
let file = NamedTempFile::new().unwrap();
let path = file.path().into();
std::fs::write(&path, hex::encode(DEFAULT_JWT_SECRET)).unwrap();
Expand Down Expand Up @@ -207,39 +218,13 @@ impl<E: EthSpec> TestingBuilder<E> {
spec,
context,
);
let port = builder_url.full.port().unwrap();
let host: Ipv4Addr = builder_url
.full
.host_str()
.unwrap()
.to_string()
.parse()
.unwrap();
let server = BlindedBlockProviderServer::new(host, port, builder.clone());
Self { server, builder }
let host: Ipv4Addr = Ipv4Addr::LOCALHOST;
let port = 0;
let provider = BlindedBlockProviderServer::new(host, port, builder.clone());
let server = provider.serve();
(builder, server)
}

pub async fn run(&self) {
let server = self.server.serve();
if let Err(err) = server.await {
println!("error while listening for incoming: {err}")
}
}
}

#[derive(Clone)]
pub struct MockBuilder<E: EthSpec> {
el: ExecutionLayer<E>,
beacon_client: BeaconNodeHttpClient,
spec: ChainSpec,
context: Arc<Context>,
val_registration_cache: Arc<RwLock<HashMap<BlsPublicKey, SignedValidatorRegistration>>>,
builder_sk: SecretKey,
operations: Arc<RwLock<Vec<Operation>>>,
invalidate_signatures: Arc<RwLock<bool>>,
}

impl<E: EthSpec> MockBuilder<E> {
pub fn new(
el: ExecutionLayer<E>,
beacon_client: BeaconNodeHttpClient,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ impl<T: EthSpec> MockExecutionLayer<T> {
None,
Some(JwtKey::from_slice(&DEFAULT_JWT_SECRET).unwrap()),
spec,
None,
)
}

Expand All @@ -45,7 +44,6 @@ impl<T: EthSpec> MockExecutionLayer<T> {
builder_threshold: Option<u128>,
jwt_key: Option<JwtKey>,
spec: ChainSpec,
builder_url: Option<SensitiveUrl>,
) -> Self {
let handle = executor.handle().unwrap();

Expand All @@ -68,7 +66,6 @@ impl<T: EthSpec> MockExecutionLayer<T> {

let config = Config {
execution_endpoints: vec![url],
builder_url,
secret_files: vec![path],
suggested_fee_recipient: Some(Address::repeat_byte(42)),
builder_profit_threshold: builder_threshold.unwrap_or(DEFAULT_BUILDER_THRESHOLD_WEI),
Expand Down
2 changes: 1 addition & 1 deletion beacon_node/execution_layer/src/test_utils/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ use warp::{http::StatusCode, Filter, Rejection};
use crate::EngineCapabilities;
pub use execution_block_generator::{generate_pow_block, Block, ExecutionBlockGenerator};
pub use hook::Hook;
pub use mock_builder::{Context as MockBuilderContext, MockBuilder, Operation, TestingBuilder};
pub use mock_builder::{Context as MockBuilderContext, MockBuilder, MockBuilderServer, Operation};
pub use mock_execution_layer::MockExecutionLayer;

pub const DEFAULT_TERMINAL_DIFFICULTY: u64 = 6400;
Expand Down
1 change: 0 additions & 1 deletion beacon_node/http_api/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@ logging = { path = "../../common/logging" }
ethereum_serde_utils = "0.5.0"
operation_pool = { path = "../operation_pool" }
sensitive_url = { path = "../../common/sensitive_url" }
unused_port = { path = "../../common/unused_port" }
store = { path = "../store" }
bytes = "1.1.0"
beacon_processor = { path = "../beacon_processor" }
Expand Down
12 changes: 2 additions & 10 deletions beacon_node/http_api/src/test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -126,17 +126,9 @@ pub async fn create_api_server<T: BeaconChainTypes>(
test_runtime: &TestRuntime,
log: Logger,
) -> ApiServer<T::EthSpec, impl Future<Output = ()>> {
// Get a random unused port.
let port = unused_port::unused_tcp4_port().unwrap();
create_api_server_on_port(chain, test_runtime, log, port).await
}
// Use port 0 to allocate a new unused port.
let port = 0;

pub async fn create_api_server_on_port<T: BeaconChainTypes>(
chain: Arc<BeaconChain<T>>,
test_runtime: &TestRuntime,
log: Logger,
port: u16,
) -> ApiServer<T::EthSpec, impl Future<Output = ()>> {
let (network_senders, network_receivers) = NetworkSenders::new();

// Default metadata
Expand Down
Loading

0 comments on commit 8eb0c38

Please sign in to comment.