Skip to content

Commit

Permalink
feat: ability to access container logs (#633)
Browse files Browse the repository at this point in the history
Closes #502

Exposes `stdout` and `stderr` methods, which returns:
- `tokio::io::AsyncBufRead` impl for async container
- `std::io::BufRead` impl for sync container (under `blocking` feature)

Also, small alignment is performed (related to #617):
- rename the `ExecResult` `stdout`/`stderr` methods to match the
container methods
  • Loading branch information
DDtKey authored May 19, 2024
1 parent 9e052bc commit cb65ec2
Show file tree
Hide file tree
Showing 8 changed files with 264 additions and 37 deletions.
2 changes: 1 addition & 1 deletion testcontainers/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ signal-hook = { version = "0.3", optional = true }
thiserror = "1.0.60"
tokio = { version = "1", features = ["macros", "fs", "rt-multi-thread"] }
tokio-stream = "0.1.15"
tokio-util = { version = "0.7.10", features = ["io-util"] }
tokio-util = { version = "0.7.10", features = ["io"] }
url = { version = "2", features = ["serde"] }

[features]
Expand Down
76 changes: 74 additions & 2 deletions testcontainers/src/core/containers/async_container.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use core::panic;
use std::{fmt, net::IpAddr, str::FromStr, sync::Arc};
use std::{fmt, net::IpAddr, pin::Pin, str::FromStr, sync::Arc};

use tokio::runtime::RuntimeFlavor;
use tokio::{io::AsyncBufRead, runtime::RuntimeFlavor};

use crate::{
core::{
Expand Down Expand Up @@ -240,6 +240,7 @@ where
})
}

/// Starts the container.
pub async fn start(&self) {
self.docker_client.start(&self.id).await;
for cmd in self
Expand All @@ -250,12 +251,14 @@ where
}
}

/// Stops the container (not the same with `pause`).
pub async fn stop(&self) {
log::debug!("Stopping docker container {}", self.id);

self.docker_client.stop(&self.id).await
}

/// Removes the container.
pub async fn rm(mut self) {
log::debug!("Deleting docker container {}", self.id);

Expand All @@ -267,6 +270,18 @@ where
self.dropped = true;
}

/// Returns an asynchronous reader for stdout.
pub fn stdout(&self) -> Pin<Box<dyn AsyncBufRead + '_>> {
let stdout = self.docker_client.stdout_logs(&self.id);
Box::pin(tokio_util::io::StreamReader::new(stdout.into_inner()))
}

/// Returns an asynchronous reader for stderr.
pub fn stderr(&self) -> Pin<Box<dyn AsyncBufRead + '_>> {
let stderr = self.docker_client.stderr_logs(&self.id);
Box::pin(tokio_util::io::StreamReader::new(stderr.into_inner()))
}

async fn block_until_ready(&self) -> Result<(), WaitContainerError> {
self.docker_client
.block_until_ready(self.id(), &self.image().ready_conditions())
Expand Down Expand Up @@ -313,3 +328,60 @@ where
}
}
}

#[cfg(test)]
mod tests {
use tokio::io::{AsyncBufReadExt, AsyncReadExt};

use super::*;
use crate::{images::generic::GenericImage, runners::AsyncRunner};

#[tokio::test]
async fn async_logs_are_accessible() {
let image = GenericImage::new("testcontainers/helloworld", "1.1.0");
let container = RunnableImage::from(image).start().await;

let mut stderr_lines = container.stderr().lines();

let expected_messages = [
"DELAY_START_MSEC: 0",
"Sleeping for 0 ms",
"Starting server on port 8080",
"Sleeping for 0 ms",
"Starting server on port 8081",
"Ready, listening on 8080 and 8081",
];
for expected_message in expected_messages {
let line = stderr_lines.next_line().await.unwrap().unwrap();
assert!(
line.contains(expected_message),
"Log message ('{line}') doesn't contain expected message ('{expected_message}')"
);
}

// logs are accessible after container is stopped
container.stop().await;

// stdout is empty
let mut stdout = String::new();
container
.stdout()
.read_to_string(&mut stdout)
.await
.unwrap();
assert_eq!(stdout, "");
// stderr contains 6 lines
let mut stderr = String::new();
container
.stderr()
.read_to_string(&mut stderr)
.await
.unwrap();
assert_eq!(
stderr.lines().count(),
6,
"unexpected stderr size: {}",
stderr
);
}
}
30 changes: 15 additions & 15 deletions testcontainers/src/core/containers/async_container/exec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use std::{fmt, io, pin::Pin, sync::Arc};

use bytes::Bytes;
use futures::stream::BoxStream;
use tokio::io::{AsyncRead, AsyncReadExt};
use tokio::io::{AsyncBufRead, AsyncReadExt};

use crate::core::client::Client;

Expand All @@ -24,31 +24,31 @@ impl<'a> ExecResult<'a> {
.map(|exec| exec.exit_code)
}

/// Returns an asynchronous reader for stdout.
pub fn stdout<'b>(&'b mut self) -> Pin<Box<dyn AsyncBufRead + 'b>> {
Box::pin(tokio_util::io::StreamReader::new(&mut self.stdout))
}

/// Returns an asynchronous reader for stderr.
pub fn stderr<'b>(&'b mut self) -> Pin<Box<dyn AsyncBufRead + 'b>> {
Box::pin(tokio_util::io::StreamReader::new(&mut self.stderr))
}

/// Returns stdout as a vector of bytes.
/// If you want to read stdout in asynchronous manner, use `stdout_reader` instead.
pub async fn stdout(&mut self) -> Result<Vec<u8>, io::Error> {
pub async fn stdout_to_vec(&mut self) -> Result<Vec<u8>, io::Error> {
let mut stdout = Vec::new();
self.stdout_reader().read_to_end(&mut stdout).await?;
self.stdout().read_to_end(&mut stdout).await?;
Ok(stdout)
}

/// Returns stderr as a vector of bytes.
/// If you want to read stderr in asynchronous manner, use `stderr_reader` instead.
pub async fn stderr(&mut self) -> Result<Vec<u8>, io::Error> {
pub async fn stderr_to_vec(&mut self) -> Result<Vec<u8>, io::Error> {
let mut stderr = Vec::new();
self.stderr_reader().read_to_end(&mut stderr).await?;
self.stderr().read_to_end(&mut stderr).await?;
Ok(stderr)
}

/// Returns an asynchronous reader for stdout.
pub fn stdout_reader<'b>(&'b mut self) -> Pin<Box<dyn AsyncRead + 'b>> {
Box::pin(tokio_util::io::StreamReader::new(&mut self.stdout))
}

/// Returns an asynchronous reader for stderr.
pub fn stderr_reader<'b>(&'b mut self) -> Pin<Box<dyn AsyncRead + 'b>> {
Box::pin(tokio_util::io::StreamReader::new(&mut self.stderr))
}
}

impl fmt::Debug for ExecResult<'_> {
Expand Down
65 changes: 63 additions & 2 deletions testcontainers/src/core/containers/sync_container.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
use std::{fmt, net::IpAddr};
use std::{fmt, io::BufRead, net::IpAddr};

use crate::{
core::{env, errors::ExecError, ports::Ports, ExecCommand},
ContainerAsync, Image,
};

pub(super) mod exec;
mod sync_reader;

/// Represents a running docker container.
///
Expand Down Expand Up @@ -139,20 +140,39 @@ where
})
}

/// Stops the container (not the same with `pause`).
pub fn stop(&self) {
self.rt().block_on(self.async_impl().stop());
}

/// Starts the container.
pub fn start(&self) {
self.rt().block_on(self.async_impl().start());
}

/// Removes the container.
pub fn rm(mut self) {
if let Some(active) = self.inner.take() {
active.runtime.block_on(active.async_impl.rm());
}
}

/// Returns a reader for stdout.
pub fn stdout(&self) -> Box<dyn BufRead + '_> {
Box::new(sync_reader::SyncReadBridge::new(
self.async_impl().stdout(),
self.rt(),
))
}

/// Returns a reader for stderr.
pub fn stderr(&self) -> Box<dyn BufRead + '_> {
Box::new(sync_reader::SyncReadBridge::new(
self.async_impl().stderr(),
self.rt(),
))
}

/// Returns reference to inner `Runtime`. It's safe to unwrap because it's `Some` until `Container` is dropped.
fn rt(&self) -> &tokio::runtime::Runtime {
&self.inner.as_ref().unwrap().runtime
Expand Down Expand Up @@ -180,7 +200,7 @@ impl<I: Image> Drop for Container<I> {
#[cfg(test)]
mod test {
use super::*;
use crate::core::WaitFor;
use crate::{core::WaitFor, runners::SyncRunner, GenericImage, RunnableImage};

#[derive(Debug, Default)]
pub struct HelloWorld;
Expand All @@ -207,4 +227,45 @@ mod test {
}

fn assert_send_and_sync<T: Send + Sync>() {}

#[test]
fn async_logs_are_accessible() {
let image = GenericImage::new("testcontainers/helloworld", "1.1.0");
let container = RunnableImage::from(image).start();

let mut stderr_lines = container.stderr().lines();

let expected_messages = [
"DELAY_START_MSEC: 0",
"Sleeping for 0 ms",
"Starting server on port 8080",
"Sleeping for 0 ms",
"Starting server on port 8081",
"Ready, listening on 8080 and 8081",
];
for expected_message in expected_messages {
let line = stderr_lines.next().unwrap().unwrap();
assert!(
line.contains(expected_message),
"Log message ('{line}') doesn't contain expected message ('{expected_message}')"
);
}

// logs are accessible after container is stopped
container.stop();

// stdout is empty
let mut stdout = String::new();
container.stdout().read_to_string(&mut stdout).unwrap();
assert_eq!(stdout, "");
// stderr contains 6 lines
let mut stderr = String::new();
container.stderr().read_to_string(&mut stderr).unwrap();
assert_eq!(
stderr.lines().count(),
6,
"unexpected stderr size: {}",
stderr
);
}
}
28 changes: 23 additions & 5 deletions testcontainers/src/core/containers/sync_container/exec.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
use std::{fmt, io};
use std::{fmt, io, io::BufRead};

use crate::core::sync_container::sync_reader;

/// Represents the result of an executed command in a container.
pub struct SyncExecResult<'a> {
Expand All @@ -13,14 +15,30 @@ impl<'a> SyncExecResult<'a> {
self.runtime.block_on(self.inner.exit_code())
}

/// Returns an asynchronous reader for stdout.
pub fn stdout<'b>(&'b mut self) -> Box<dyn BufRead + 'b> {
Box::new(sync_reader::SyncReadBridge::new(
self.inner.stdout(),
self.runtime,
))
}

/// Returns an asynchronous reader for stderr.
pub fn stderr<'b>(&'b mut self) -> Box<dyn BufRead + 'b> {
Box::new(sync_reader::SyncReadBridge::new(
self.inner.stderr(),
self.runtime,
))
}

/// Returns stdout as a vector of bytes.
pub fn stdout(&mut self) -> Result<Vec<u8>, io::Error> {
self.runtime.block_on(self.inner.stdout())
pub fn stdout_to_vec(&mut self) -> Result<Vec<u8>, io::Error> {
self.runtime.block_on(self.inner.stdout_to_vec())
}

/// Returns stderr as a vector of bytes.
pub fn stderr(&mut self) -> Result<Vec<u8>, io::Error> {
self.runtime.block_on(self.inner.stderr())
pub fn stderr_to_vec(&mut self) -> Result<Vec<u8>, io::Error> {
self.runtime.block_on(self.inner.stderr_to_vec())
}
}

Expand Down
64 changes: 64 additions & 0 deletions testcontainers/src/core/containers/sync_container/sync_reader.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
use std::io::{BufRead, Read};

use tokio::io::{AsyncBufRead, AsyncBufReadExt, AsyncRead, AsyncReadExt};

/// Allows to use [`tokio::io::AsyncRead`] synchronously as [`std::io::Read`].
/// In fact, it's almost the same as [`tokio_util::io::SyncIoBridge`], but utilizes [`tokio::runtime::Runtime`] instead of [`tokio::runtime::Handle`].
/// This is needed because [`tokio::runtime::Handle::block_on`] can't drive the IO on `current_thread` runtime.
pub(super) struct SyncReadBridge<'a, T> {
inner: T,
runtime: &'a tokio::runtime::Runtime,
}

impl<'a, T: Unpin> SyncReadBridge<'a, T> {
pub fn new(inner: T, runtime: &'a tokio::runtime::Runtime) -> Self {
Self { inner, runtime }
}
}

impl<T: AsyncBufRead + Unpin> BufRead for SyncReadBridge<'_, T> {
fn fill_buf(&mut self) -> std::io::Result<&[u8]> {
let inner = &mut self.inner;
self.runtime.block_on(AsyncBufReadExt::fill_buf(inner))
}

fn consume(&mut self, amt: usize) {
let inner = &mut self.inner;
AsyncBufReadExt::consume(inner, amt)
}

fn read_until(&mut self, byte: u8, buf: &mut Vec<u8>) -> std::io::Result<usize> {
let inner = &mut self.inner;
self.runtime
.block_on(AsyncBufReadExt::read_until(inner, byte, buf))
}
fn read_line(&mut self, buf: &mut String) -> std::io::Result<usize> {
let inner = &mut self.inner;
self.runtime
.block_on(AsyncBufReadExt::read_line(inner, buf))
}
}

impl<T: AsyncRead + Unpin> Read for SyncReadBridge<'_, T> {
fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
let inner = &mut self.inner;
self.runtime.block_on(AsyncReadExt::read(inner, buf))
}

fn read_to_end(&mut self, buf: &mut Vec<u8>) -> std::io::Result<usize> {
let inner = &mut self.inner;
self.runtime.block_on(inner.read_to_end(buf))
}

fn read_to_string(&mut self, buf: &mut String) -> std::io::Result<usize> {
let inner = &mut self.inner;
self.runtime.block_on(inner.read_to_string(buf))
}

fn read_exact(&mut self, buf: &mut [u8]) -> std::io::Result<()> {
let inner = &mut self.inner;
// The AsyncRead trait returns the count, synchronous doesn't.
let _n = self.runtime.block_on(inner.read_exact(buf))?;
Ok(())
}
}
Loading

0 comments on commit cb65ec2

Please sign in to comment.