Skip to content

Commit

Permalink
feat(query,indexing): Add duration in log output on pipeline completion
Browse files Browse the repository at this point in the history
  • Loading branch information
timonv committed Sep 16, 2024
1 parent a871c61 commit 8029926
Show file tree
Hide file tree
Showing 2 changed files with 57 additions and 20 deletions.
25 changes: 16 additions & 9 deletions swiftide-indexing/src/pipeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -145,11 +145,11 @@ impl Pipeline {
tracing::trace_span!("filter_cached", node_cache = ?cache, node = ?node );
async move {
if cache.get(&node).await {
tracing::info!(node_cache = cache.name(), "Node in cache, skipping");
tracing::debug!(node = ?node, node_cache = cache.name(), "Node in cache, skipping");
Ok(None)
} else {
cache.set(&node).await;
tracing::info!(node_cache = cache.name(), "Node not in cache, processing");
tracing::debug!(node = ?node, node_cache = cache.name(), "Node not in cache, processing");
Ok(Some(node))
}
}
Expand Down Expand Up @@ -185,10 +185,10 @@ impl Pipeline {
.stream
.map_ok(move |node| {
let transformer = transformer.clone();
let span = tracing::trace_span!("then", node = ?node );
let span = tracing::trace_span!("then", node = ?node);

task::spawn(async move {
tracing::info!(transformer = transformer.name(), "Transforming node");
tracing::debug!(node = ?node, transformer = transformer.name(), "Transforming node");
transformer.transform_node(node).await
})
.instrument(span)
Expand Down Expand Up @@ -233,7 +233,7 @@ impl Pipeline {
let span = tracing::trace_span!("then_in_batch", nodes = ?nodes );

tokio::spawn(async move {
tracing::info!(
tracing::debug!(
batch_transformer = transformer.name(),
num_nodes = nodes.len(),
"Batch transforming nodes"
Expand Down Expand Up @@ -271,7 +271,7 @@ impl Pipeline {
let span = tracing::trace_span!("then_chunk", chunker = ?chunker, node = ?node );

tokio::spawn(async move {
tracing::info!(chunker = chunker.name(), "Chunking node");
tracing::debug!(chunker = chunker.name(), "Chunking node");
chunker.transform_node(node).await
})
.instrument(span)
Expand Down Expand Up @@ -314,7 +314,7 @@ impl Pipeline {
let span = tracing::trace_span!("then_store_with_batched", storage = ?storage, nodes = ?nodes );

tokio::spawn(async move {
tracing::info!(storage = storage.name(), num_nodes = nodes.len(), "Batch Storing nodes");
tracing::debug!(storage = storage.name(), num_nodes = nodes.len(), "Batch Storing nodes");
storage.batch_store(nodes).await
})
.instrument(span)
Expand All @@ -334,7 +334,7 @@ impl Pipeline {
tracing::trace_span!("then_store_with", storage = ?storage, node = ?node );

tokio::spawn(async move {
tracing::info!(storage = storage.name(), "Storing node");
tracing::debug!(storage = storage.name(), "Storing node");

storage.store(node).await
})
Expand Down Expand Up @@ -537,6 +537,7 @@ impl Pipeline {
"Starting indexing pipeline with {} concurrency",
self.concurrency
);
let now = std::time::Instant::now();
if self.storage.is_empty() {
anyhow::bail!("No storage configured for indexing pipeline");
}
Expand All @@ -554,7 +555,13 @@ impl Pipeline {
total_nodes += 1;
}

tracing::warn!("Processed {} nodes", total_nodes);
let elapsed_in_seconds = now.elapsed().as_secs();
tracing::warn!(
elapsed_in_seconds,
"Processed {} nodes in {} seconds",
total_nodes,
elapsed_in_seconds
);
tracing::Span::current().record("total_nodes", total_nodes);

Ok(())
Expand Down
52 changes: 41 additions & 11 deletions swiftide-query/src/query/pipeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ where

tokio::spawn(
async move {
tracing::info!(
tracing::debug!(
query_transformer = transformer.name(),
"Transforming query"
);
Expand Down Expand Up @@ -160,7 +160,7 @@ impl<'stream: 'static, S: SearchStrategy + 'stream> Pipeline<'stream, S, states:
let evaluator_for_stream = evaluator_for_stream.clone();

tokio::spawn(async move {
tracing::info!(retriever = retriever.name(), "Retrieving documents");
tracing::debug!(retriever = retriever.name(), "Retrieving documents");

let result = retriever.retrieve(&search_strategy, query).await?;

Expand Down Expand Up @@ -208,7 +208,7 @@ impl<'stream: 'static, S: SearchStrategy> Pipeline<'stream, S, states::Retrieved
let transformer = Arc::clone(&transformer);
let span = tracing::trace_span!("then_transform_response", query = ?query);
tokio::spawn(async move {
tracing::info!(
tracing::debug!(
response_transformer = transformer.name(),
"Transforming response"
);
Expand Down Expand Up @@ -254,7 +254,7 @@ impl<'stream: 'static, S: SearchStrategy> Pipeline<'stream, S, states::Retrieved
let evaluator_for_stream = evaluator_for_stream.clone();

tokio::spawn(async move {
tracing::info!(answerer = answerer.name(), "Answering query");
tracing::debug!(answerer = answerer.name(), "Answering query");
let result = answerer.answer(query).await?;

if let Some(evaluator) = evaluator_for_stream.as_ref() {
Expand Down Expand Up @@ -290,13 +290,23 @@ impl<S: SearchStrategy> Pipeline<'_, S, states::Answered> {
mut self,
query: impl Into<Query<states::Pending>>,
) -> Result<Query<states::Answered>> {
tracing::info!("Sending query");
tracing::debug!("Sending query");
let now = std::time::Instant::now();

self.query_sender.send(Ok(query.into())).await?;

self.stream.try_next().await?.ok_or_else(|| {
let answer = self.stream.try_next().await?.ok_or_else(|| {
anyhow::anyhow!("Pipeline did not receive a response from the query stream")
})
});

let elapsed_in_seconds = now.elapsed().as_secs();
tracing::warn!(
elapsed_in_seconds,
"Answered query in {} seconds",
elapsed_in_seconds
);

answer
}

/// Runs the pipeline with a user query, accepts `&str` as well.
Expand All @@ -311,18 +321,29 @@ impl<S: SearchStrategy> Pipeline<'_, S, states::Answered> {
&mut self,
query: impl Into<Query<states::Pending>>,
) -> Result<Query<states::Answered>> {
tracing::info!("Sending query");
tracing::warn!("Sending query");
let now = std::time::Instant::now();

self.query_sender.send(Ok(query.into())).await?;

self.stream
let answer = self
.stream
.by_ref()
.take(1)
.try_next()
.await?
.ok_or_else(|| {
anyhow::anyhow!("Pipeline did not receive a response from the query stream")
})
});

let elapsed_in_seconds = now.elapsed().as_secs();
tracing::warn!(
elapsed_in_seconds,
"Answered query in {} seconds",
elapsed_in_seconds
);

answer
}

/// Runs the pipeline with multiple queries
Expand All @@ -335,7 +356,8 @@ impl<S: SearchStrategy> Pipeline<'_, S, states::Answered> {
self,
queries: Vec<impl Into<Query<states::Pending>> + Clone>,
) -> Result<Vec<Query<states::Answered>>> {
tracing::info!("Sending queries");
tracing::warn!("Sending queries");
let now = std::time::Instant::now();

let Pipeline {
query_sender,
Expand All @@ -356,6 +378,14 @@ impl<S: SearchStrategy> Pipeline<'_, S, states::Answered> {
break;
}
}

let elapsed_in_seconds = now.elapsed().as_secs();
tracing::warn!(
num_queries = queries.len(),
elapsed_in_seconds,
"Answered all queries in {} seconds",
elapsed_in_seconds
);
Ok(results)
}
}
Expand Down

0 comments on commit 8029926

Please sign in to comment.