Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Not all logs are captured when a container is dropped #719

Open
inahga opened this issue Aug 7, 2024 · 3 comments
Open

Not all logs are captured when a container is dropped #719

inahga opened this issue Aug 7, 2024 · 3 comments

Comments

@inahga
Copy link

inahga commented Aug 7, 2024

When a container is created with a LogConsumer, the ContainerAsync does not wait for the logs to stop while dropping. If the process ends quickly, e.g. when running a single cargo test, some logs may not be captured by the time the process ends.

See this example failing test. Note that by its nature it's a flaky test, so you may need to increase the $(seq 20) argument to reproduce it, though on my relatively fast machine the counter never gets past 1 or 2.

#[cfg(test)]
mod tests {
    use std::{
        ops::AddAssign,
        sync::{Arc, Mutex},
    };

    use futures::{future::BoxFuture, FutureExt};
    use testcontainers::{
        core::logs::{consumer::LogConsumer, LogFrame},
        runners::AsyncRunner,
        GenericImage, ImageExt,
    };

    struct TestLogConsumer(Arc<Mutex<u64>>);

    impl LogConsumer for TestLogConsumer {
        fn accept<'a>(&'a self, _: &'a LogFrame) -> BoxFuture<'a, ()> {
            async move {
                self.0.lock().unwrap().add_assign(1);
            }
            .boxed()
        }
    }

    #[tokio::test]
    async fn bad_test() {
        let counter = Arc::new(Mutex::new(0u64));

        {
            GenericImage::new("docker.io/library/ubuntu", "latest")
                .with_cmd(["/bin/bash", "-c", "for i in $(seq 20); do echo $i; done"])
                .with_log_consumer(TestLogConsumer(Arc::clone(&counter)))
                .start()
                .await
                .unwrap();
        }

        assert_eq!(counter.lock().unwrap().to_owned(), 20);
    }
}

I'm able to workaround this by wrapping the container and avoiding using LogConsumer at all:

#[cfg(test)]
mod tests {
    use std::{
        ops::AddAssign,
        pin::Pin,
        sync::{Arc, Mutex},
    };

    use testcontainers::{runners::AsyncRunner, ContainerAsync, GenericImage, ImageExt};
    use tokio::{
        io::{AsyncBufRead, AsyncBufReadExt},
        runtime::Handle,
        task::JoinHandle,
    };

    type Buffer = Pin<Box<dyn AsyncBufRead + Send>>;

    struct TestLogConsumer {
        stdout_handle: Option<JoinHandle<()>>,
        stderr_handle: Option<JoinHandle<()>>,
    }

    impl TestLogConsumer {
        fn new(counter: Arc<Mutex<u64>>, stdout: Buffer, stderr: Buffer) -> Self {
            let (stdout_counter, stderr_counter) = (Arc::clone(&counter), Arc::clone(&counter));

            let stdout_handle = Some(tokio::spawn(async move {
                let mut lines = stdout.lines();
                while let Ok(Some(_)) = lines.next_line().await {
                    stdout_counter.lock().unwrap().add_assign(1);
                }
            }));

            let stderr_handle = Some(tokio::spawn(async move {
                let mut lines = stderr.lines();
                while let Ok(Some(_)) = lines.next_line().await {
                    stderr_counter.lock().unwrap().add_assign(1);
                }
            }));

            Self {
                stdout_handle,
                stderr_handle,
            }
        }
    }

    impl Drop for TestLogConsumer {
        fn drop(&mut self) {
            let handle = Handle::current();
            let stdout_handle = self.stdout_handle.take();
            let stderr_handle = self.stderr_handle.take();
            tokio::task::block_in_place(|| {
                stdout_handle.map(|task| handle.block_on(task));
                stderr_handle.map(|task| handle.block_on(task));
            });
        }
    }

    struct TestContainer {
        container: ContainerAsync<GenericImage>,
        log_consumer: TestLogConsumer,
    }

    impl TestContainer {
        async fn new(counter: Arc<Mutex<u64>>) -> Self {
            let container = GenericImage::new("docker.io/library/ubuntu", "latest")
                .with_cmd(["/bin/bash", "-c", "for i in $(seq 20); do echo $i; done"])
                .start()
                .await
                .unwrap();

            let log_consumer =
                TestLogConsumer::new(counter, container.stdout(true), container.stderr(true));

            TestContainer {
                container,
                log_consumer,
            }
        }
    }

    #[tokio::test(flavor = "multi_thread")]
    async fn good_test() {
        let counter = Arc::new(Mutex::new(0u64));

        {
            TestContainer::new(Arc::clone(&counter)).await;
        }

        assert_eq!(counter.lock().unwrap().to_owned(), 20);
    }
}

But that's unfortunate. I think if someone is using a LogConsumer they'd prefer that all logs are captured.

I think the problem is that the log consumer task is created with tokio::spawn, and the corresponding handle is never joined. https://github.com/testcontainers/testcontainers-rs/blob/main/testcontainers/src/core/containers/async_container.rs#L71.

I'm more than happy to fix this myself, but wanted to gauge maintainer interest first, since adding more blocking calls to a Drop is generally unpleasant.

@divergentdave
Copy link
Contributor

Beyond waiting for the log streams to finish, we may want to first request that Docker stop the container, and then only delete the container after waiting for log streams. Currently, dropping the container requests deletion right away, and I think there may be a race between log streams ending due to container deletion versus log streams ending due to the process exiting.

@DDtKey
Copy link
Collaborator

DDtKey commented Aug 11, 2024

Hi there 👋
Thanks for the detailed description of the issue.

This is actually a good question. Since we allow custom consumers, this can result in significant delays per drop (for example, many nested consumers).

Perhaps, an explicit interface to enforce the behavior instead (e.g JoinedConsumer<Inner> or consumer.join()) could be an option.

However, the important part is that this behavior is likely desirable in most cases. Thus, I think it’s reasonable to have them joined/awaited on drop. Mentioning that custom implementation of the trait may affect drop.

@DDtKey
Copy link
Collaborator

DDtKey commented Aug 11, 2024

Currently, dropping the container requests deletion right away, and I think there may be a race between log streams ending due to container deletion versus log streams ending due to the process exiting.

That's a good point too. In general, it also depends on the container. Usually, there is graceful shutdown logic and default timeouts should cover that. But we can request stop prior to deletion just to ensure container's stopped.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants