From 80299269054eb440e55a42667a7bcc9ba6514a7b Mon Sep 17 00:00:00 2001 From: Timon Vonk Date: Sun, 15 Sep 2024 17:48:51 +0200 Subject: [PATCH] feat(query,indexing): Add duration in log output on pipeline completion --- swiftide-indexing/src/pipeline.rs | 25 ++++++++----- swiftide-query/src/query/pipeline.rs | 52 ++++++++++++++++++++++------ 2 files changed, 57 insertions(+), 20 deletions(-) diff --git a/swiftide-indexing/src/pipeline.rs b/swiftide-indexing/src/pipeline.rs index f02977a3..f8d1c40f 100644 --- a/swiftide-indexing/src/pipeline.rs +++ b/swiftide-indexing/src/pipeline.rs @@ -145,11 +145,11 @@ impl Pipeline { tracing::trace_span!("filter_cached", node_cache = ?cache, node = ?node ); async move { if cache.get(&node).await { - tracing::info!(node_cache = cache.name(), "Node in cache, skipping"); + tracing::debug!(node = ?node, node_cache = cache.name(), "Node in cache, skipping"); Ok(None) } else { cache.set(&node).await; - tracing::info!(node_cache = cache.name(), "Node not in cache, processing"); + tracing::debug!(node = ?node, node_cache = cache.name(), "Node not in cache, processing"); Ok(Some(node)) } } @@ -185,10 +185,10 @@ impl Pipeline { .stream .map_ok(move |node| { let transformer = transformer.clone(); - let span = tracing::trace_span!("then", node = ?node ); + let span = tracing::trace_span!("then", node = ?node); task::spawn(async move { - tracing::info!(transformer = transformer.name(), "Transforming node"); + tracing::debug!(node = ?node, transformer = transformer.name(), "Transforming node"); transformer.transform_node(node).await }) .instrument(span) @@ -233,7 +233,7 @@ impl Pipeline { let span = tracing::trace_span!("then_in_batch", nodes = ?nodes ); tokio::spawn(async move { - tracing::info!( + tracing::debug!( batch_transformer = transformer.name(), num_nodes = nodes.len(), "Batch transforming nodes" @@ -271,7 +271,7 @@ impl Pipeline { let span = tracing::trace_span!("then_chunk", chunker = ?chunker, node = ?node ); tokio::spawn(async move { - tracing::info!(chunker = chunker.name(), "Chunking node"); + tracing::debug!(chunker = chunker.name(), "Chunking node"); chunker.transform_node(node).await }) .instrument(span) @@ -314,7 +314,7 @@ impl Pipeline { let span = tracing::trace_span!("then_store_with_batched", storage = ?storage, nodes = ?nodes ); tokio::spawn(async move { - tracing::info!(storage = storage.name(), num_nodes = nodes.len(), "Batch Storing nodes"); + tracing::debug!(storage = storage.name(), num_nodes = nodes.len(), "Batch Storing nodes"); storage.batch_store(nodes).await }) .instrument(span) @@ -334,7 +334,7 @@ impl Pipeline { tracing::trace_span!("then_store_with", storage = ?storage, node = ?node ); tokio::spawn(async move { - tracing::info!(storage = storage.name(), "Storing node"); + tracing::debug!(storage = storage.name(), "Storing node"); storage.store(node).await }) @@ -537,6 +537,7 @@ impl Pipeline { "Starting indexing pipeline with {} concurrency", self.concurrency ); + let now = std::time::Instant::now(); if self.storage.is_empty() { anyhow::bail!("No storage configured for indexing pipeline"); } @@ -554,7 +555,13 @@ impl Pipeline { total_nodes += 1; } - tracing::warn!("Processed {} nodes", total_nodes); + let elapsed_in_seconds = now.elapsed().as_secs(); + tracing::warn!( + elapsed_in_seconds, + "Processed {} nodes in {} seconds", + total_nodes, + elapsed_in_seconds + ); tracing::Span::current().record("total_nodes", total_nodes); Ok(()) diff --git a/swiftide-query/src/query/pipeline.rs b/swiftide-query/src/query/pipeline.rs index 8cb77ff0..d3cd10fe 100644 --- a/swiftide-query/src/query/pipeline.rs +++ b/swiftide-query/src/query/pipeline.rs @@ -110,7 +110,7 @@ where tokio::spawn( async move { - tracing::info!( + tracing::debug!( query_transformer = transformer.name(), "Transforming query" ); @@ -160,7 +160,7 @@ impl<'stream: 'static, S: SearchStrategy + 'stream> Pipeline<'stream, S, states: let evaluator_for_stream = evaluator_for_stream.clone(); tokio::spawn(async move { - tracing::info!(retriever = retriever.name(), "Retrieving documents"); + tracing::debug!(retriever = retriever.name(), "Retrieving documents"); let result = retriever.retrieve(&search_strategy, query).await?; @@ -208,7 +208,7 @@ impl<'stream: 'static, S: SearchStrategy> Pipeline<'stream, S, states::Retrieved let transformer = Arc::clone(&transformer); let span = tracing::trace_span!("then_transform_response", query = ?query); tokio::spawn(async move { - tracing::info!( + tracing::debug!( response_transformer = transformer.name(), "Transforming response" ); @@ -254,7 +254,7 @@ impl<'stream: 'static, S: SearchStrategy> Pipeline<'stream, S, states::Retrieved let evaluator_for_stream = evaluator_for_stream.clone(); tokio::spawn(async move { - tracing::info!(answerer = answerer.name(), "Answering query"); + tracing::debug!(answerer = answerer.name(), "Answering query"); let result = answerer.answer(query).await?; if let Some(evaluator) = evaluator_for_stream.as_ref() { @@ -290,13 +290,23 @@ impl Pipeline<'_, S, states::Answered> { mut self, query: impl Into>, ) -> Result> { - tracing::info!("Sending query"); + tracing::debug!("Sending query"); + let now = std::time::Instant::now(); self.query_sender.send(Ok(query.into())).await?; - self.stream.try_next().await?.ok_or_else(|| { + let answer = self.stream.try_next().await?.ok_or_else(|| { anyhow::anyhow!("Pipeline did not receive a response from the query stream") - }) + }); + + let elapsed_in_seconds = now.elapsed().as_secs(); + tracing::warn!( + elapsed_in_seconds, + "Answered query in {} seconds", + elapsed_in_seconds + ); + + answer } /// Runs the pipeline with a user query, accepts `&str` as well. @@ -311,18 +321,29 @@ impl Pipeline<'_, S, states::Answered> { &mut self, query: impl Into>, ) -> Result> { - tracing::info!("Sending query"); + tracing::warn!("Sending query"); + let now = std::time::Instant::now(); self.query_sender.send(Ok(query.into())).await?; - self.stream + let answer = self + .stream .by_ref() .take(1) .try_next() .await? .ok_or_else(|| { anyhow::anyhow!("Pipeline did not receive a response from the query stream") - }) + }); + + let elapsed_in_seconds = now.elapsed().as_secs(); + tracing::warn!( + elapsed_in_seconds, + "Answered query in {} seconds", + elapsed_in_seconds + ); + + answer } /// Runs the pipeline with multiple queries @@ -335,7 +356,8 @@ impl Pipeline<'_, S, states::Answered> { self, queries: Vec> + Clone>, ) -> Result>> { - tracing::info!("Sending queries"); + tracing::warn!("Sending queries"); + let now = std::time::Instant::now(); let Pipeline { query_sender, @@ -356,6 +378,14 @@ impl Pipeline<'_, S, states::Answered> { break; } } + + let elapsed_in_seconds = now.elapsed().as_secs(); + tracing::warn!( + num_queries = queries.len(), + elapsed_in_seconds, + "Answered all queries in {} seconds", + elapsed_in_seconds + ); Ok(results) } }