diff --git a/swiftide-query/src/query/pipeline.rs b/swiftide-query/src/query/pipeline.rs index ff273767..a71d76d3 100644 --- a/swiftide-query/src/query/pipeline.rs +++ b/swiftide-query/src/query/pipeline.rs @@ -13,6 +13,7 @@ //! //! A query pipeline is lazy and only runs when query is called. +use futures_util::TryFutureExt as _; use std::sync::Arc; use swiftide_core::{ prelude::*, @@ -107,9 +108,13 @@ where let transformer = Arc::clone(&transformer); let span = tracing::trace_span!("then_transform_query", query = ?query); - async move { transformer.transform_query(query).await }.instrument(span) + tokio::spawn( + async move { transformer.transform_query(query).await }.instrument(span), + ) + .err_into::() }) - .try_buffer_unordered(default_concurrency); + .try_buffer_unordered(default_concurrency) + .map(|x| x.and_then(|x| x)); Pipeline { stream: new_stream.boxed().into(), @@ -147,7 +152,7 @@ impl<'stream: 'static, S: SearchStrategy + 'stream> Pipeline<'stream, S, states: let span = tracing::trace_span!("then_retrieve", query = ?query); let evaluator_for_stream = evaluator_for_stream.clone(); - async move { + tokio::spawn(async move { let result = retriever.retrieve(&search_strategy, query).await?; if let Some(evaluator) = evaluator_for_stream.as_ref() { @@ -156,10 +161,12 @@ impl<'stream: 'static, S: SearchStrategy + 'stream> Pipeline<'stream, S, states: } else { Ok(result) } - } + }) .instrument(span) + .err_into::() }) - .try_buffer_unordered(default_concurrency); + .try_buffer_unordered(default_concurrency) + .map(|x| x.and_then(|x| x)); Pipeline { stream: new_stream.boxed().into(), @@ -191,9 +198,12 @@ impl<'stream: 'static, S: SearchStrategy> Pipeline<'stream, S, states::Retrieved .map_ok(move |query| { let transformer = Arc::clone(&transformer); let span = tracing::trace_span!("then_transform_response", query = ?query); - async move { transformer.transform_response(query).await }.instrument(span) + tokio::spawn(async move { transformer.transform_response(query).await }) + .instrument(span) + .err_into::() }) - .try_buffer_unordered(default_concurrency); + .try_buffer_unordered(default_concurrency) + .map(|x| x.and_then(|x| x)); Pipeline { stream: new_stream.boxed().into(), @@ -228,7 +238,7 @@ impl<'stream: 'static, S: SearchStrategy> Pipeline<'stream, S, states::Retrieved let span = tracing::trace_span!("then_answer", query = ?query); let evaluator_for_stream = evaluator_for_stream.clone(); - async move { + tokio::spawn(async move { let result = answerer.answer(query).await?; if let Some(evaluator) = evaluator_for_stream.as_ref() { evaluator.evaluate(result.clone().into()).await?; @@ -236,10 +246,13 @@ impl<'stream: 'static, S: SearchStrategy> Pipeline<'stream, S, states::Retrieved } else { Ok(result) } - } + }) .instrument(span) + .err_into::() }) - .try_buffer_unordered(default_concurrency); + .try_buffer_unordered(default_concurrency) + .map(|x| x.and_then(|x| x)); + Pipeline { stream: new_stream.boxed().into(), search_strategy,