Skip to content

Commit

Permalink
chore: enable clippy pedantic (#132)
Browse files Browse the repository at this point in the history
  • Loading branch information
timonv authored Jul 7, 2024
1 parent 51c114c commit d2a9ea1
Show file tree
Hide file tree
Showing 27 changed files with 261 additions and 98 deletions.
4 changes: 2 additions & 2 deletions .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,6 @@ jobs:
- uses: dtolnay/rust-toolchain@stable
with:
components: clippy,rustfmt
- name: Cache Cargo dependencies
uses: Swatinem/rust-cache@v2
- uses: r7kamura/rust-problem-matchers@v1
- name: "Clippy"
run: cargo clippy --all-targets --all-features
31 changes: 21 additions & 10 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,16 +7,8 @@ All notable changes to this project will be documented in this file.

### Bug Fixes

- [46b3cbc](https://github.com/bosun-ai/swiftide/commit/46b3cbc114d522efd1afab05ac33b46fcfbd9159) *(changelog)* Exclude bots from detailed lines

- [9334934](https://github.com/bosun-ai/swiftide/commit/9334934e4af92b35dbc61e1f92aa90abac29ca12) *(chunkcode)* Use correct chunksizes by @timonv in [#122](https://github.com/bosun-ai/swiftide/pull/122)

- [af0775e](https://github.com/bosun-ai/swiftide/commit/af0775ec4b635318d4a2fed8df1783fdd028983e) *(ci)* Minimal components

- [9445777](https://github.com/bosun-ai/swiftide/commit/9445777e6cd462874b426ff698447c3e8f0fd5f4) *(ci)* Remove cache

- [dd70537](https://github.com/bosun-ai/swiftide/commit/dd7053748801c44fa644b770121b56f5d60ee390) *(ci)* Job cleanup and separate cache for coverage

- [dba29a0](https://github.com/bosun-ai/swiftide/commit/dba29a07fa68589151536b5ba197a69ff339ad01) *(ci)* Ensure clippy runs with all features

- [3b98334](https://github.com/bosun-ai/swiftide/commit/3b98334b2bf78cfe9c957bfa1dd3cd7c939b6c39) *(deps)* Update rust crate serde_json to v1.0.120 by @renovate[bot] in [#115](https://github.com/bosun-ai/swiftide/pull/115)
Expand All @@ -33,7 +25,7 @@ All notable changes to this project will be documented in this file.
---------
````
- [c73377f](https://github.com/bosun-ai/swiftide/commit/c73377fb695412eaa329ed937731074288088097) *(uncategorized)* Clippy
- [b498074](https://github.com/bosun-ai/swiftide/commit/b4980746b55073ce870bc897aef6721d10883acd) *(uncategorized)* Clippy
### Documentation
Expand All @@ -49,7 +41,26 @@ All notable changes to this project will be documented in this file.
- [bd72c6a](https://github.com/bosun-ai/swiftide/commit/bd72c6a62228deed722bbc22bdcd389843cde453) *(ci)* Coverage using llvm-cov
- [ad77a5f](https://github.com/bosun-ai/swiftide/commit/ad77a5faea79708de5dfee3dc0ef7ff170eebf01) *(uncategorized)* Properly configure typos
- [51c114c](https://github.com/bosun-ai/swiftide/commit/51c114ceb06db840c4952d3d0f694bfbf266681c) *(uncategorized)* Various tooling & community improvements by @timonv in [#131](https://github.com/bosun-ai/swiftide/pull/131)
````text
- **fix(ci): ensure clippy runs with all features**
- **chore(ci): coverage using llvm-cov**
- **chore: drastically improve changelog generation**
- **chore(ci): add sanity checks for pull requests**
- **chore(ci): split jobs and add typos**
````
- [84dd65d](https://github.com/bosun-ai/swiftide/commit/84dd65dc6c0ff4595f27ed061a4f4c0a2dae7202) *(uncategorized)* Rename all mentions of ingest to index by @timonv in [#130](https://github.com/bosun-ai/swiftide/pull/130) [**breaking**]
````text
Swiftide is not an ingestion pipeline (loading data), but an indexing
pipeline (prepping for search).
There is now a temporary, deprecated re-export to match the previous api.
````
- [d7d318e](https://github.com/bosun-ai/swiftide/commit/d7d318e60d42a1fce58c08e296c0aeac2674b32b) *(uncategorized)* Enable clippy pedantic
- [88429f9](https://github.com/bosun-ai/swiftide/commit/88429f9730c43e44d5707c3d1615f8509a3f2a24) *(uncategorized)* Drastically improve changelog generation
Expand Down
12 changes: 11 additions & 1 deletion swiftide/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ readme = "../README.md"
keywords = ["llm", "rag", "ai", "data", "openai"]
description = "Blazing fast, streaming document and code indexation"
categories = ["asynchronous"]
licence-file = "../LICENSE"
repository = "https://github.com/bosun-ai/swiftide-rs"

[dependencies]
anyhow = { version = "1.0.86", features = ["backtrace"] }
Expand Down Expand Up @@ -94,8 +94,18 @@ mockall = "0.12.1"
temp-dir = "0.1.13"
wiremock = "0.6.0"

[lints.rust]
unsafe_code = "forbid"

[lints.clippy]
cargo = { level = "warn", priority = -1 }
pedantic = { level = "warn", priority = -1 }
blocks_in_conditions = "allow"
must_use_candidate = "allow"
module_name_repetitions = "allow"
missing_fields_in_debug = "allow"
# Should be fixed asap
multiple_crate_versions = "allow"

[package.metadata.docs.rs]
all-features = true
2 changes: 1 addition & 1 deletion swiftide/src/indexing/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ impl Node {
let metadata = self
.metadata
.iter()
.map(|(k, v)| format!("{}: {}", k, v))
.map(|(k, v)| format!("{k}: {v}"))
.collect::<Vec<String>>()
.join("\n");

Expand Down
41 changes: 32 additions & 9 deletions swiftide/src/indexing/pipeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ impl Default for Pipeline {
fn default() -> Self {
Self {
stream: IndexingStream::empty(),
storage: Default::default(),
storage: Vec::default(),
concurrency: num_cpus::get(),
}
}
Expand Down Expand Up @@ -78,6 +78,7 @@ impl Pipeline {
/// # Returns
///
/// An instance of `Pipeline` with the updated concurrency level.
#[must_use]
pub fn with_concurrency(mut self, concurrency: usize) -> Self {
self.concurrency = concurrency;
self
Expand All @@ -92,6 +93,7 @@ impl Pipeline {
/// # Returns
///
/// An instance of `Pipeline` with the updated stream that filters out cached nodes.
#[must_use]
pub fn filter_cached(mut self, cache: impl NodeCache + 'static) -> Self {
let cache = Arc::new(cache);
self.stream = self
Expand All @@ -101,13 +103,13 @@ impl Pipeline {
let span =
tracing::trace_span!("filter_cached", node_cache = ?cache, node = ?node );
async move {
if !cache.get(&node).await {
if cache.get(&node).await {
tracing::debug!("Node in cache, skipping");
Ok(None)
} else {
cache.set(&node).await;
tracing::debug!("Node not in cache, passing through");
Ok(Some(node))
} else {
tracing::debug!("Node in cache, skipping");
Ok(None)
}
}
.instrument(span)
Expand All @@ -126,6 +128,7 @@ impl Pipeline {
/// # Returns
///
/// An instance of `Pipeline` with the updated stream that applies the transformer to each node.
#[must_use]
pub fn then(mut self, transformer: impl Transformer + 'static) -> Self {
let concurrency = transformer.concurrency().unwrap_or(self.concurrency);
let transformer = Arc::new(transformer);
Expand Down Expand Up @@ -154,6 +157,7 @@ impl Pipeline {
/// # Returns
///
/// An instance of `Pipeline` with the updated stream that applies the batch transformer to each batch of nodes.
#[must_use]
pub fn then_in_batch(
mut self,
batch_size: usize,
Expand Down Expand Up @@ -186,6 +190,7 @@ impl Pipeline {
/// # Returns
///
/// An instance of `Pipeline` with the updated stream that applies the chunker transformer to each node.
#[must_use]
pub fn then_chunk(mut self, chunker: impl ChunkerTransformer + 'static) -> Self {
let chunker = Arc::new(chunker);
let concurrency = chunker.concurrency().unwrap_or(self.concurrency);
Expand Down Expand Up @@ -214,6 +219,12 @@ impl Pipeline {
/// # Returns
///
/// An instance of `Pipeline` with the configured storage backend.
///
/// # Panics
///
/// Panics if batch size turns out to be not set and batch storage is still invoked.
/// Pipeline only invokes batch storing if the batch size is set, so should be alright.
#[must_use]
pub fn then_store_with(mut self, storage: impl Persist + 'static) -> Self {
let storage = Arc::new(storage);
self.storage.push(storage.clone());
Expand Down Expand Up @@ -259,6 +270,11 @@ impl Pipeline {
/// if sending fails.
///
/// They can either be run concurrently, alternated between or merged back together.
///
/// # Panics
///
/// Panics if the receiving pipelines buffers are full or unavailable.
#[must_use]
pub fn split_by<P>(self, predicate: P) -> (Self, Self)
where
P: Fn(&Result<Node>) -> bool + Send + Sync + 'static,
Expand All @@ -282,13 +298,13 @@ impl Pipeline {
left_tx
.send(item)
.await
.expect("Failed to send to left stream")
.expect("Failed to send to left stream");
} else {
tracing::debug!(?item, "Sending to right stream");
right_tx
.send(item)
.await
.expect("Failed to send to right stream")
.expect("Failed to send to right stream");
}
}
})
Expand Down Expand Up @@ -316,6 +332,7 @@ impl Pipeline {
/// This is useful for merging two streams that have been split using the `split_by` method.
///
/// The full stream can then be processed using the `run` method.
#[must_use]
pub fn merge(self, other: Self) -> Self {
let stream = tokio_stream::StreamExt::merge(self.stream, other.stream);

Expand All @@ -327,7 +344,8 @@ impl Pipeline {

/// Throttles the stream of nodes, limiting the rate to 1 per duration.
///
/// Useful for rate limiting the indexing pipeline. Uses tokio_stream::StreamExt::throttle internally which has a granualarity of 1ms.
/// Useful for rate limiting the indexing pipeline. Uses `tokio_stream::StreamExt::throttle` internally which has a granualarity of 1ms.
#[must_use]
pub fn throttle(mut self, duration: impl Into<Duration>) -> Self {
self.stream = tokio_stream::StreamExt::throttle(self.stream, duration.into())
.boxed()
Expand All @@ -339,6 +357,7 @@ impl Pipeline {
//
// This method filters out errors encountered by the pipeline, preventing them from bubbling up and terminating the stream.
// Note that errors are not logged.
#[must_use]
pub fn filter_errors(mut self) -> Self {
self.stream = self
.stream
Expand All @@ -358,6 +377,7 @@ impl Pipeline {
/// This allows you to skip specific errors or nodes, or do ad hoc inspection.
///
/// If the closure returns true, the result is kept, otherwise it is skipped.
#[must_use]
pub fn filter<F>(mut self, filter: F) -> Self
where
F: Fn(&Result<Node>) -> bool + Send + Sync + 'static,
Expand All @@ -377,13 +397,15 @@ impl Pipeline {
/// Logs all results processed by the pipeline.
///
/// This method logs all results processed by the pipeline at the `DEBUG` level.
#[must_use]
pub fn log_all(self) -> Self {
self.log_errors().log_nodes()
}

/// Logs all errors encountered by the pipeline.
///
/// This method logs all errors encountered by the pipeline at the `ERROR` level.
#[must_use]
pub fn log_errors(mut self) -> Self {
self.stream = self
.stream
Expand All @@ -396,6 +418,7 @@ impl Pipeline {
/// Logs all nodes processed by the pipeline.
///
/// This method logs all nodes processed by the pipeline at the `DEBUG` level.
#[must_use]
pub fn log_nodes(mut self) -> Self {
self.stream = self
.stream
Expand Down Expand Up @@ -493,7 +516,7 @@ mod tests {
let mut nodes = vec![];
for i in 0..3 {
let mut node = node.clone();
node.chunk = format!("transformed_chunk_{}", i);
node.chunk = format!("transformed_chunk_{i}");
nodes.push(Ok(node));
}
nodes.into()
Expand Down
85 changes: 85 additions & 0 deletions swiftide/src/ingestion/ingestion_stream.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
#![allow(clippy::from_over_into)]
//! This module defines the `IngestionStream` type, which is used for handling asynchronous streams of `IngestionNode` items in the ingestion pipeline.
use anyhow::Result;
use futures_util::stream::{self, Stream};
use pin_project_lite::pin_project;
use std::pin::Pin;
use tokio::sync::mpsc::Receiver;

use super::IngestionNode;

pub use futures_util::{StreamExt, TryStreamExt};

// We need to inform the compiler that `inner` is pinned as well
pin_project! {
/// An asynchronous stream of `IngestionNode` items.
///
/// Wraps an internal stream of `Result<IngestionNode>` items.
///
/// Streams, iterators and vectors of `Result<IngestionNode>` can be converted into an `IngestionStream`.
pub struct IngestionStream {
#[pin]
pub(crate) inner: Pin<Box<dyn Stream<Item = Result<IngestionNode>> + Send>>,
}
}

impl Stream for IngestionStream {
type Item = Result<IngestionNode>;

fn poll_next(
self: Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Option<Self::Item>> {
let this = self.project();
this.inner.poll_next(cx)
}
}

impl Into<IngestionStream> for Vec<Result<IngestionNode>> {
fn into(self) -> IngestionStream {
IngestionStream::iter(self)
}
}

impl Into<IngestionStream> for Result<Vec<IngestionNode>> {
fn into(self) -> IngestionStream {
match self {
Ok(nodes) => IngestionStream::iter(nodes.into_iter().map(Ok)),
Err(err) => IngestionStream::iter(vec![Err(err)]),
}
}
}

impl Into<IngestionStream> for Pin<Box<dyn Stream<Item = Result<IngestionNode>> + Send>> {
fn into(self) -> IngestionStream {
IngestionStream { inner: self }
}
}

impl Into<IngestionStream> for Receiver<Result<IngestionNode>> {
fn into(self) -> IngestionStream {
IngestionStream {
inner: tokio_stream::wrappers::ReceiverStream::new(self).boxed(),
}
}
}

impl IngestionStream {
pub fn empty() -> Self {
IngestionStream {
inner: stream::empty().boxed(),
}
}

// NOTE: Can we really guarantee that the iterator will outlive the stream?
pub fn iter<I>(iter: I) -> Self
where
I: IntoIterator<Item = Result<IngestionNode>> + Send + 'static,
<I as IntoIterator>::IntoIter: Send,
{
IngestionStream {
inner: stream::iter(iter).boxed(),
}
}
}
Loading

0 comments on commit d2a9ea1

Please sign in to comment.