Skip to content

Commit

Permalink
feat(persist): in memory storage for testing, experimentation and deb…
Browse files Browse the repository at this point in the history
…ugging
  • Loading branch information
timonv committed Jun 23, 2024
1 parent eb84dd2 commit 2351867
Show file tree
Hide file tree
Showing 7 changed files with 90 additions and 10 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 6 additions & 1 deletion examples/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,10 @@ edition = "2021"

[dev-dependencies]
tokio = { version = "1.0", features = ["full"] }
swiftide = { path = "../swiftide/", features = ["all"] }
swiftide = { path = "../swiftide/", features = ["all", "scraping"] }
tracing-subscriber = "0.3"
serde_json = "1.0"
spider = "1.98"

[[example]]
name = "ingest-codebase"
Expand All @@ -25,3 +26,7 @@ path = "ingest_into_redis.rs"
[[example]]
name = "ingest-markdown-metadata"
path = "ingest_markdown_lots_of_metadata.rs"

[[example]]
name = "scraping-ingest"
path = "scraping_ingest_to_markdown.rs"
26 changes: 17 additions & 9 deletions swiftide/src/integrations/scraping/loader.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use derive_builder::Builder;
use spider::website::Website;
use tokio::sync::RwLock;
use tokio::{runtime::Handle, sync::RwLock};

use crate::{
ingestion::{IngestionNode, IngestionStream},
Expand All @@ -18,8 +18,12 @@ pub struct ScrapingLoader {
}

impl ScrapingLoader {
pub fn builder() -> ScrapingLoaderBuilder {
ScrapingLoaderBuilder::default()
}

// Constructs a scrapingloader from a `spider::Website` configuration
#![allow(dead_code)]
#[allow(dead_code)]
pub fn from_spider(spider_website: Website) -> Self {
Self {
spider_website: RwLock::new(spider_website),
Expand All @@ -35,13 +39,17 @@ impl ScrapingLoader {
impl Loader for ScrapingLoader {
fn into_stream(self) -> IngestionStream {
let (tx, rx) = std::sync::mpsc::channel();
let mut spider_rx = self
.spider_website
.blocking_write()
.subscribe(0)
.expect("Failed to subscribe to spider");
let mut spider_rx = tokio::task::block_in_place(|| {
Handle::current().block_on(async {
self.spider_website
.write()
.await
.subscribe(0)
.expect("Failed to subscribe to spider")
})
});

let _recv_thread = std::thread::spawn(|| async move {
let _recv_thread = tokio::spawn(async move {
while let Ok(res) = spider_rx.recv().await {
let node = IngestionNode {
chunk: res.get_html(),
Expand All @@ -56,7 +64,7 @@ impl Loader for ScrapingLoader {
}
});

let _scrape_thread = std::thread::spawn(|| async move {
let _scrape_thread = tokio::spawn(async move {
let mut spider_website = self.spider_website.write().await;
spider_website.scrape().await;
});
Expand Down
3 changes: 3 additions & 0 deletions swiftide/src/integrations/scraping/mod.rs
Original file line number Diff line number Diff line change
@@ -1,2 +1,5 @@
mod html_to_markdown_transformer;
mod loader;

pub use html_to_markdown_transformer::HtmlToMarkdownTransformer;
pub use loader::ScrapingLoader;
1 change: 1 addition & 0 deletions swiftide/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
pub mod ingestion;
pub mod integrations;
pub mod loaders;
pub mod persist;
pub mod traits;
pub mod transformers;
pub mod type_aliases;
Expand Down
60 changes: 60 additions & 0 deletions swiftide/src/persist/memory_storage.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
use std::collections::HashMap;

use anyhow::Result;
use async_trait::async_trait;
use derive_builder::Builder;
use tokio::sync::RwLock;

use crate::{
ingestion::{IngestionNode, IngestionStream},
Persist,
};

#[derive(Debug, Default, Builder)]
#[builder(pattern = "owned")]
/// A simple in-memory storage implementation.
///
/// Great for experimentation and testing.
pub struct MemoryStorage {
data: RwLock<HashMap<String, IngestionNode>>,
#[builder(default)]
batch_size: Option<usize>,
}

impl MemoryStorage {
fn key(&self, node: &IngestionNode) -> String {
node.path.clone().to_string_lossy().to_string()
}

#[allow(dead_code)]
async fn get(&self, key: &str) -> Option<IngestionNode> {
self.data.read().await.get(key).cloned()
}
}

#[async_trait]
impl Persist for MemoryStorage {
async fn setup(&self) -> Result<()> {
Ok(())
}

async fn store(&self, node: IngestionNode) -> Result<IngestionNode> {
self.data
.write()
.await
.insert(self.key(&node), node.clone());
Ok(node)
}

async fn batch_store(&self, nodes: Vec<IngestionNode>) -> IngestionStream {
let mut lock = self.data.write().await;
for node in &nodes {
lock.insert(self.key(node), node.clone());
}
IngestionStream::iter(nodes.into_iter().map(Ok))
}

fn batch_size(&self) -> Option<usize> {
self.batch_size
}
}
2 changes: 2 additions & 0 deletions swiftide/src/persist/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
mod memory_storage;
pub use memory_storage::MemoryStorage;

0 comments on commit 2351867

Please sign in to comment.