Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixes race in detection of ClickHouse port from log file #139

Merged
merged 5 commits into from
Jun 30, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ default-members = [
"oximeter/oximeter",
"oximeter/oximeter-macro-impl",
]
resolver = "2"

[profile.dev]
panic = "abort"
Expand Down
4 changes: 4 additions & 0 deletions omicron-common/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,10 @@ features = [ "serde", "v4" ]
expectorate = "1.0.1"
serde_urlencoded = "0.7.0"

[dev-dependencies.tokio]
version = "1.7"
features = [ "test-util" ]

#
# Disable doc builds by default for our binaries to work around issue
# rust-lang/cargo#8373. These docs would not be very useful anyway.
Expand Down
209 changes: 192 additions & 17 deletions omicron-common/src/dev/clickhouse.rs
Original file line number Diff line number Diff line change
@@ -1,19 +1,22 @@
//! Tools for managing ClickHouse during development

use std::net::SocketAddr;
use std::path::{Path, PathBuf};
use std::process::Stdio;
use std::time::Duration;

use anyhow::{bail, Context};
use anyhow::Context;
use tempfile::TempDir;
use tokio::{
fs::File,
io::{AsyncBufReadExt, BufReader},
time::{sleep, Instant},
};

use crate::dev::poll;

// Timeout used when starting up ClickHouse subprocess.
const CLICKHOUSE_TIMEOUT: Duration = Duration::from_secs(10);

/// A `ClickHouseInstance` is used to start and manage a ClickHouse server process.
#[derive(Debug)]
pub struct ClickHouseInstance {
Expand Down Expand Up @@ -82,7 +85,7 @@ impl ClickHouseInstance {
}
},
&Duration::from_millis(500),
&Duration::from_secs(10),
&CLICKHOUSE_TIMEOUT,
)
.await?;

Expand All @@ -91,7 +94,7 @@ impl ClickHouseInstance {
let port = if port != 0 {
port
} else {
discover_local_listening_port(&log_path).await?
discover_local_listening_port(&log_path, CLICKHOUSE_TIMEOUT).await?
};

Ok(Self {
Expand Down Expand Up @@ -170,25 +173,197 @@ impl Drop for ClickHouseInstance {
// the HTTP server. This is only used if the port is chosen by the OS, not the caller.
async fn discover_local_listening_port(
path: &Path,
timeout: Duration,
) -> Result<u16, anyhow::Error> {
let timeout = Instant::now() + timeout;
tokio::time::timeout_at(timeout, find_clickhouse_port_in_log(path))
.await
.context("Failed to find ClickHouse port within timeout")?
}

// Parse the clickhouse log for a port number.
//
// NOTE: This function loops forever until the expected line is found. It should be run under a
// timeout, or some other mechanism for cancelling it.
async fn find_clickhouse_port_in_log(
path: &Path,
) -> Result<u16, anyhow::Error> {
let reader = BufReader::new(
File::open(path).await.context("Failed to open ClickHouse log file")?,
);
const NEEDLE: &str = "<Information> Application: Listening for http://";
let mut lines = reader.lines();
while let Some(line) = lines
.next_line()
.await
.context("Failed to read line from ClickHouse log file")?
{
if let Some(needle_start) = line.find(&NEEDLE) {
let address_start = needle_start + NEEDLE.len();
let address: SocketAddr =
line[address_start..].trim().parse().context(
"Failed to parse ClickHouse socket address from log",
)?;
return Ok(address.port());
loop {
let line = lines
.next_line()
.await
.context("Failed to read line from ClickHouse log file")?;
match line {
Some(line) => {
if let Some(needle_start) = line.find(&NEEDLE) {
// The address is currently written as ":PORT", but may in the future be written as
// "ADDR:PORT" or "HOST:PORT". Split on the colon, and parse the port number, rather
// than assuming the address conforms to a specific syntax.
let address_start = needle_start + NEEDLE.len();
return line[address_start..]
.trim()
.split(':')
.last()
.context("ClickHouse log file does not contain the expected HTTP listening address")?
.parse()
.context("Failed to parse ClickHouse port number from log");
}
}
None => {
// Reached EOF, just sleep for an interval and check again.
sleep(Duration::from_millis(10)).await;
}
}
}
bail!("Failed to discover port from ClickHouse log file");
}

#[cfg(test)]
mod tests {
use super::{discover_local_listening_port, CLICKHOUSE_TIMEOUT};
use std::{io::Write, sync::Arc, time::Duration};
use tempfile::NamedTempFile;
use tokio::{sync::Mutex, task::spawn, time::sleep};

const EXPECTED_PORT: u16 = 12345;

#[tokio::test]
async fn test_discover_local_listening_port() {
// Write some data to a fake log file
let mut file = NamedTempFile::new().unwrap();
writeln!(file, "A garbage line").unwrap();
writeln!(
file,
"<Information> Application: Listening for http://127.0.0.1:{}",
EXPECTED_PORT
)
.unwrap();
writeln!(file, "Another garbage line").unwrap();
file.flush().unwrap();

assert_eq!(
discover_local_listening_port(file.path(), CLICKHOUSE_TIMEOUT)
.await
.unwrap(),
EXPECTED_PORT
);
}

// A regression test for #131.
//
// The function `discover_local_listening_port` initially read from the log file until EOF, but
// there's no guarantee that ClickHouse has written the port we're searching for before the
// reader consumes the whole file. This test confirms that the file is read until the line is
// found, ignoring EOF, at least until the timeout is hit.
#[tokio::test]
async fn test_discover_local_listening_port_slow_write() {
// In this case the writer is slightly "slower" than the reader.
let writer_interval = Duration::from_millis(20);
assert_eq!(
read_log_file(CLICKHOUSE_TIMEOUT, writer_interval).await.unwrap(),
EXPECTED_PORT
);
}

// An extremely slow write test, to verify the timeout handling.
#[tokio::test]
async fn test_discover_local_listening_port_timeout() {
// In this case, the writer is _much_ slower than the reader, so that the reader times out
// entirely before finding the desired line.
let reader_timeout = Duration::from_millis(1);
let writer_interval = Duration::from_millis(100);
assert!(read_log_file(reader_timeout, writer_interval).await.is_err());
}

// Implementation of the above tests, simulating simultaneous reading/writing of the log file
//
// This uses Tokio's test utilities to manage time, rather than relying on timeouts.
async fn read_log_file(
reader_timeout: Duration,
writer_interval: Duration,
) -> Result<u16, anyhow::Error> {
async fn write_and_wait(
file: &mut NamedTempFile,
line: String,
interval: Duration,
) {
println!("Writing to log file");
writeln!(file, "{}", line).unwrap();
file.flush().unwrap();
sleep(interval).await;
}

// Start a task that slowly writes lines to the log file.
//
// NOTE: This looks overly complicated, and it is. We have to wrap this in a mutex because
// both this function, and the writer task we're spawning, need access to the file. They
// may complete in any order, and so it's not possible to give one of them ownership over
// the `NamedTempFile`. If the owning task completes, that may delete the file before the
// other task accesses it. So we need interior mutability (because one of the references is
// mutable for writing), and _this_ scope must own it.
let file = Arc::new(Mutex::new(NamedTempFile::new()?));
let path = file.lock().await.path().to_path_buf();
let writer_file = file.clone();
let writer_task = spawn(async move {
let mut file = writer_file.lock().await;
write_and_wait(
&mut file,
"A garbage line".to_string(),
writer_interval,
)
.await;
write_and_wait(
&mut file,
format!(
"<Information> Application: Listening for http://127.0.0.1:{}",
EXPECTED_PORT
),
writer_interval,
)
.await;
write_and_wait(
&mut file,
"Another garbage line".to_string(),
writer_interval,
)
.await;
});
println!("Starting reader task");
let reader_task = discover_local_listening_port(&path, reader_timeout);

// "Run" the test.
//
// We pause tokio's internal timer and advance it by the writer interval. This simulates
// the writer sleeping for a time between the write of each line, without explicitly
// sleeping the whole test thread. Note that the futures for the reader/writer tasks must
// be pinned to the stack, so that they may be polled on multiple passes through the select
// loop without consuming them.
tokio::time::pause();
tokio::pin!(writer_task);
tokio::pin!(reader_task);
let mut poll_writer = true;
let reader_result = loop {
tokio::select! {
reader_result = &mut reader_task => {
println!("Reader finished");
break reader_result;
},
writer_result = &mut writer_task, if poll_writer => {
println!("Writer finished");
let _ = writer_result.unwrap();
poll_writer = false;
},
_ = tokio::time::advance(writer_interval) => {
println!("Advancing time by {:#?}", writer_interval);
}
}
};
// Resume Tokio's timer
tokio::time::resume();
reader_result
}
}