Skip to content

Commit

Permalink
feat(ingestion_pipeline): splitting and merging streams
Browse files Browse the repository at this point in the history
  • Loading branch information
timonv committed Jun 30, 2024
1 parent 2650605 commit 5aeb3a7
Show file tree
Hide file tree
Showing 2 changed files with 144 additions and 1 deletion.
134 changes: 134 additions & 0 deletions swiftide/src/ingestion/ingestion_pipeline.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use crate::{BatchableTransformer, ChunkerTransformer, Loader, NodeCache, Persist, Transformer};
use anyhow::Result;
use futures_util::{StreamExt, TryStreamExt};
use tokio::sync::mpsc;
use tracing::Instrument;

use std::{sync::Arc, time::Duration};
Expand Down Expand Up @@ -232,6 +233,74 @@ impl IngestionPipeline {
self
}

/// Splits the stream into two streams based on a predicate.
///
/// Note that this is not lazy. It will start consuming the stream immediately
/// and send each item to the left or right stream based on the predicate.
///
/// The other streams have a buffer, but should be started as soon as possible.
///
/// They can either be run concurrently, alternated between or merged back together.
pub fn split_by<P>(self, predicate: P) -> (Self, Self)
where
P: Fn(&Result<IngestionNode>) -> bool + Send + Sync + 'static,
{
let predicate = Arc::new(predicate);

let (left_tx, left_rx) = mpsc::channel(1000);
let (right_tx, right_rx) = mpsc::channel(1000);

let stream = self.stream;
let span = tracing::trace_span!("split_by");
tokio::spawn(async move {
stream
.for_each(move |item| {
let predicate = Arc::clone(&predicate);
let left_tx = left_tx.clone();
let right_tx = right_tx.clone();
async move {
if predicate(&item) {
tracing::debug!(?item, "Sending to left stream");
left_tx.send(item).await.unwrap();
} else {
tracing::debug!(?item, "Sending to right stream");
right_tx.send(item).await.unwrap();
}
}
})
.instrument(span)
.await;
});

let left_pipeline = Self {
stream: left_rx.into(),
storage: self.storage.clone(),
concurrency: self.concurrency,
};

let right_pipeline = Self {
stream: right_rx.into(),
storage: self.storage.clone(),
concurrency: self.concurrency,
};

(left_pipeline, right_pipeline)
}

/// Merges two streams into one
///
/// This is useful for merging two streams that have been split using the `split_by` method.
///
/// The full stream can then be processed using the `run` method.
pub fn merge(self, other: Self) -> Self {
let stream = tokio_stream::StreamExt::merge(self.stream, other.stream);

Self {
stream: stream.boxed().into(),
..self
}
}

/// Throttles the stream of nodes, limiting the rate to 1 per duration.
///
/// Useful for rate limiting the ingestion pipeline. Uses tokio_stream::StreamExt::throttle internally which has a granualarity of 1ms.
Expand Down Expand Up @@ -570,4 +639,69 @@ mod tests {
let nodes = storage.get_all().await;
assert_eq!(nodes.len(), 2);
}

#[test_log::test(tokio::test)]
async fn test_split_and_merge() {
let mut loader = MockLoader::new();
let storage = MemoryStorage::default();
let mut seq = Sequence::new();
loader
.expect_into_stream()
.times(1)
.in_sequence(&mut seq)
.returning(|| {
vec![
Ok(IngestionNode::default()),
Ok(IngestionNode {
chunk: "will go left".to_string(),
..IngestionNode::default()
}),
Ok(IngestionNode::default()),
]
.into()
});

let pipeline = IngestionPipeline::from_loader(loader);
let (mut left, mut right) = pipeline.split_by(|node| {
if let Ok(node) = node {
node.chunk.starts_with("will go left")
} else {
false
}
});

// change the chunk to 'left'
left = left
.then(move |mut node: IngestionNode| {
node.chunk = "left".to_string();

Ok(node)
})
.log_all();

right = right.then(move |mut node: IngestionNode| {
node.chunk = "right".to_string();
Ok(node)
});

left.merge(right)
.then_store_with(storage.clone())
.run()
.await
.unwrap();
dbg!(storage.clone());

let all_nodes = storage.get_all_values().await;
assert_eq!(
all_nodes.iter().filter(|node| node.chunk == "left").count(),
1
);
assert_eq!(
all_nodes
.iter()
.filter(|node| node.chunk == "right")
.count(),
2
);
}
}
11 changes: 10 additions & 1 deletion swiftide/src/ingestion/ingestion_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use anyhow::Result;
use futures_util::stream::{self, Stream};
use pin_project_lite::pin_project;
use std::pin::Pin;
use tokio::sync::mpsc::Receiver;

use super::IngestionNode;

Expand All @@ -20,7 +21,7 @@ pin_project! {
/// Streams, iterators and vectors of `Result<IngestionNode>` can be converted into an `IngestionStream`.
pub struct IngestionStream {
#[pin]
inner: Pin<Box<dyn Stream<Item = Result<IngestionNode>> + Send>>,
pub(crate) inner: Pin<Box<dyn Stream<Item = Result<IngestionNode>> + Send>>,
}
}

Expand Down Expand Up @@ -57,6 +58,14 @@ impl Into<IngestionStream> for Pin<Box<dyn Stream<Item = Result<IngestionNode>>
}
}

impl Into<IngestionStream> for Receiver<Result<IngestionNode>> {
fn into(self) -> IngestionStream {
IngestionStream {
inner: tokio_stream::wrappers::ReceiverStream::new(self).boxed(),
}
}
}

impl IngestionStream {
pub fn empty() -> Self {
IngestionStream {
Expand Down

0 comments on commit 5aeb3a7

Please sign in to comment.