Skip to content

Commit

Permalink
feat(query): Improve query performance similar to indexing in 0.12
Browse files Browse the repository at this point in the history
  • Loading branch information
timonv committed Sep 16, 2024
1 parent 01cf579 commit 081a248
Showing 1 changed file with 23 additions and 10 deletions.
33 changes: 23 additions & 10 deletions swiftide-query/src/query/pipeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
//!
//! A query pipeline is lazy and only runs when query is called.
use futures_util::TryFutureExt as _;
use std::sync::Arc;
use swiftide_core::{
prelude::*,
Expand Down Expand Up @@ -107,9 +108,13 @@ where
let transformer = Arc::clone(&transformer);
let span = tracing::trace_span!("then_transform_query", query = ?query);

async move { transformer.transform_query(query).await }.instrument(span)
tokio::spawn(
async move { transformer.transform_query(query).await }.instrument(span),
)
.err_into::<anyhow::Error>()
})
.try_buffer_unordered(default_concurrency);
.try_buffer_unordered(default_concurrency)
.map(|x| x.and_then(|x| x));

Pipeline {
stream: new_stream.boxed().into(),
Expand Down Expand Up @@ -147,7 +152,7 @@ impl<'stream: 'static, S: SearchStrategy + 'stream> Pipeline<'stream, S, states:
let span = tracing::trace_span!("then_retrieve", query = ?query);
let evaluator_for_stream = evaluator_for_stream.clone();

async move {
tokio::spawn(async move {
let result = retriever.retrieve(&search_strategy, query).await?;

if let Some(evaluator) = evaluator_for_stream.as_ref() {
Expand All @@ -156,10 +161,12 @@ impl<'stream: 'static, S: SearchStrategy + 'stream> Pipeline<'stream, S, states:
} else {
Ok(result)
}
}
})
.instrument(span)
.err_into::<anyhow::Error>()
})
.try_buffer_unordered(default_concurrency);
.try_buffer_unordered(default_concurrency)
.map(|x| x.and_then(|x| x));

Pipeline {
stream: new_stream.boxed().into(),
Expand Down Expand Up @@ -191,9 +198,12 @@ impl<'stream: 'static, S: SearchStrategy> Pipeline<'stream, S, states::Retrieved
.map_ok(move |query| {
let transformer = Arc::clone(&transformer);
let span = tracing::trace_span!("then_transform_response", query = ?query);
async move { transformer.transform_response(query).await }.instrument(span)
tokio::spawn(async move { transformer.transform_response(query).await })
.instrument(span)
.err_into::<anyhow::Error>()
})
.try_buffer_unordered(default_concurrency);
.try_buffer_unordered(default_concurrency)
.map(|x| x.and_then(|x| x));

Pipeline {
stream: new_stream.boxed().into(),
Expand Down Expand Up @@ -228,18 +238,21 @@ impl<'stream: 'static, S: SearchStrategy> Pipeline<'stream, S, states::Retrieved
let span = tracing::trace_span!("then_answer", query = ?query);
let evaluator_for_stream = evaluator_for_stream.clone();

async move {
tokio::spawn(async move {
let result = answerer.answer(query).await?;
if let Some(evaluator) = evaluator_for_stream.as_ref() {
evaluator.evaluate(result.clone().into()).await?;
Ok(result)
} else {
Ok(result)
}
}
})
.instrument(span)
.err_into::<anyhow::Error>()
})
.try_buffer_unordered(default_concurrency);
.try_buffer_unordered(default_concurrency)
.map(|x| x.and_then(|x| x));

Pipeline {
stream: new_stream.boxed().into(),
search_strategy,
Expand Down

0 comments on commit 081a248

Please sign in to comment.