Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Combining a tower Service with a tonic client panics #547

Closed
iffyio opened this issue Jan 26, 2021 · 7 comments
Closed

Combining a tower Service with a tonic client panics #547

iffyio opened this issue Jan 26, 2021 · 7 comments

Comments

@iffyio
Copy link

iffyio commented Jan 26, 2021

prost = "0.7.0"
http = "0.2"
tokio = { version = "=1.1.0", features = ["full"] }
tonic = { version = "0.4.0", features = ["tls", "tls-roots" ]}
tower = "0.4.4"

Hi, I get the following while updating my deps to tokio v1 and tonic/tower along with it.

panicked at 'buffer full; poll_ready must be called first'

Can confirm that poll_ready was in fact called and returned Poll::Ready(Ok(())) before call was invoked.
Looking at the code, it seems related to cloning a Semaphore, which resets to State::Empty so that call is now always invoked with State::Empty even though a permit was acquired successfully? Or maybe something's fundamentally changed with 0.4 re how this works?

Included here a small example scenario that hits this issue and hopefully illustrates the setup:

#[derive(Clone)]
pub struct Svc(Channel);

impl Service<Request<BoxBody>> for Svc {
    type Response = Response<Body>;
    type Error = Box<dyn std::error::Error + Send + Sync>;
    type Future = Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send>>;

    fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
        self.0.poll_ready(cx).map_err(Into::into)
    }

    fn call(&mut self, req: Request<BoxBody>) -> Self::Future {
        let mut inner = self.0.clone();
        Box::pin(async move {
            inner.call(req).await.map_err(Into::into)
        })
    }
}

#[tokio::main]
async fn main() {
    type Client = MyGrpcClient<Svc>;
    let domain = "example.com";
    let tls_config = ClientTlsConfig::new().domain_name(domain);
    let conn = Channel::from_shared("https://".to_owned() + domain)
        .unwrap()
        .tls_config(tls_config)
        .unwrap()
        .connect()
        .await
        .unwrap();

    Client::new(Svc(conn)).rpc_request(RpcPayload {}).await.unwrap();
}
@davidpdrsn
Copy link
Member

Hm yeah that does look odd. I'm not very familiar with this part of the code but seems odd to me that Semaphore is Clone in the first place. I would imagine that not being the case and requiring an Arc to make it clone. tokio::sync::Semaphore for reference is not Clone. However they also don't have any methods that take &mut self so not sure its possible for tower's Semaphore.

@hawkw Do you know?

@olix0r
Copy link
Collaborator

olix0r commented Jan 26, 2021

impl Service<Request<BoxBody>> for Svc {
    type Response = Response<Body>;
    type Error = Box<dyn std::error::Error + Send + Sync>;
    type Future = Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send>>;

    fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
        self.0.poll_ready(cx).map_err(Into::into)
    }

    fn call(&mut self, req: Request<BoxBody>) -> Self::Future {
        let mut inner = self.0.clone();
        Box::pin(async move {
            inner.call(req).await.map_err(Into::into)
        })
    }
}

This subtly breaks the contract. The service is driven to ready and then cloned before it is invoked. The original service is ready, but the clone is not necessarily ready.

To fix this, the call function could be rewritten as:

       Box::pin(self.0.call(req).err_into::<Error>())

which avoids the cloning.

If cloning is really necessary, you could use let mut inner = mem::replace(&mut self.0, self.0.clone()) to "take" the ready service and replace it with the clone.

@iffyio
Copy link
Author

iffyio commented Jan 27, 2021

Ah I see, that's unfortunate this accidentally worked. I initially thought of the replace alternative but that felt like a hack, would've been nice with an api that didn't allow this e.g not making things cloneable rather than panic 🤔 Thanks for clarifying!

@iffyio iffyio closed this as completed Jan 27, 2021
@davidpdrsn
Copy link
Member

Uhh thats really subtle and not something I had thought of. I think makes sense to mention it in the Service docs. I added a PR for that #548.

@davidpdrsn
Copy link
Member

Some more context for people who might discover this in the future:

I have tried implementing a version of Buffer that doesn't have this problem but not successfully. The root problem is that in poll_ready we want to reserve space in the channel we're using to send requests to the background worker and then use that allocated space in call. This leads to a few challenges.

It is currently done using a semaphore but could in theory also be done using tokio::sync::mpsc::Sender::reserve. Both methods give you some kind of permit, that is something that proves you have reserved capacity in a channel. So in poll_ready we obtain such a permit and then use it in call. That means we need to store the permit on Buffer to get it from poll_ready to call. However these permits cannot be Clone since that would allow you to send things without reserving capacity first. So if Buffer stores such a permit then it cannot be Clone either. That breaks one of the main use cases of Buffer, to make services Clone.

The current implementation gets around this by Semaphore storing the permit internally and call taking that permit and panicing if for some reason there isn't a permit in the Semaphore. As mentioned Semaphores are made Clone by discarding the permit they store internally.

We could just ignore poll_ready and both reserve and use the channel capacity in call but that means other middlewares cannot check for readiness of a Buffer without also calling it. That would break things like load balancers so it also isn't acceptable.

I think the "token" solution suggested here could fix this. We would probably be able to make Buffer::Token the permit and require users to pass it to call, thus removing the need for Buffer to store the permit internally.

Arqu added a commit to n0-computer/iroh that referenced this issue Oct 6, 2022
Arqu added a commit to n0-computer/iroh that referenced this issue Oct 17, 2022
Arqu added a commit to n0-computer/iroh that referenced this issue Oct 18, 2022
@0xAlcibiades
Copy link

0xAlcibiades commented Sep 21, 2023

Has anyone looked into a fix for this here in tower, or in tonic?

@erichulburd
Copy link

erichulburd commented Jun 27, 2024

is there an issue with using std::future::poll_fn within the Pin<Box<dyn Future>>? For instance, something like the following:

impl Service<Request<BoxBody>> for Svc {
    type Response = Response<Body>;
    type Error = Box<dyn std::error::Error + Send + Sync>;
    type Future = Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send>>;

    fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
        self.0.poll_ready(cx).map_err(Into::into)
    }

    fn call(&mut self, req: Request<BoxBody>) -> Self::Future {
        let mut inner = self.0.clone();
        Box::pin(async move {
            std::future::poll_fn(|cx| inner.poll_ready(cx))
                .await
                .map_err(Into::into)
                .and_then(|_| inner.call(req).await.map_err(Into::into))
        })
    }
}

I guess I should mention that my real use case is implementing retries within call, requiring invocation of inner.call().await multiple times. Some preliminary testing has checked out, but I'm not sure how well it would hold up in a more asynchronous environment. Here's a hand wavy idea of what I'd be doing:

    fn call(&mut self, req: Request<BoxBody>) -> Self::Future {
        let mut inner = self.0.clone();
        Box::pin(async move {
            for _ in 0..3 {
                std::future::poll_fn(|cx| inner.poll_ready(cx))
                    .await
                    .map_err(Into::into)
                    .and_then(|_| inner.call(req).await.map_err(Into::into))
            }
        })
    }

This is stop-gap for a more fully baked version of ergonomic retries: #682 and a tonic version that relies on http with clonable requests (see hyperium/tonic#733).

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

5 participants