diff --git a/Cargo.lock b/Cargo.lock index 239c30f8..a9aeffb2 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2932,6 +2932,7 @@ dependencies = [ "testcontainers", "text-splitter", "tokio", + "tokio-stream", "tracing", "tree-sitter", "tree-sitter-javascript", diff --git a/swiftide/Cargo.toml b/swiftide/Cargo.toml index b04ebe97..9813f8aa 100644 --- a/swiftide/Cargo.toml +++ b/swiftide/Cargo.toml @@ -23,6 +23,7 @@ serde = { version = "1.0.203", features = ["derive"] } serde_json = "1.0.117" text-splitter = { version = "0.13.1", features = ["markdown"] } tokio = { version = "1.38.0", features = ["full"] } +tokio-stream = "0.1.15" tracing = { version = "0.1.40", features = ["log"] } strum = "0.26.2" strum_macros = "0.26.4" diff --git a/swiftide/src/ingestion/ingestion_pipeline.rs b/swiftide/src/ingestion/ingestion_pipeline.rs index fe134bf8..de679fa2 100644 --- a/swiftide/src/ingestion/ingestion_pipeline.rs +++ b/swiftide/src/ingestion/ingestion_pipeline.rs @@ -3,7 +3,7 @@ use anyhow::Result; use futures_util::{StreamExt, TryStreamExt}; use tracing::Instrument; -use std::sync::Arc; +use std::{sync::Arc, time::Duration}; use super::IngestionStream; @@ -228,6 +228,14 @@ impl IngestionPipeline { self } + /// Throttles the stream of nodes, limiting the rate to 1 per duration. + /// + /// Useful for rate limiting the ingestion pipeline. Uses tokio_stream::StreamExt::throttle internally which has a granualarity of 1ms. + pub fn throttle(mut self, duration: impl Into) -> Self { + self.stream = tokio_stream::StreamExt::throttle(self.stream, duration.into()).boxed(); + self + } + // Silently filters out errors encountered by the pipeline. // // This method filters out errors encountered by the pipeline, preventing them from bubbling up and terminating the stream.