From 235186707182e8c39b8f22c6dd9d54eb32f7d1e5 Mon Sep 17 00:00:00 2001 From: Timon Vonk Date: Sun, 23 Jun 2024 18:43:11 +0200 Subject: [PATCH] feat(persist): in memory storage for testing, experimentation and debugging --- Cargo.lock | 1 + examples/Cargo.toml | 7 ++- swiftide/src/integrations/scraping/loader.rs | 26 ++++++--- swiftide/src/integrations/scraping/mod.rs | 3 + swiftide/src/lib.rs | 1 + swiftide/src/persist/memory_storage.rs | 60 ++++++++++++++++++++ swiftide/src/persist/mod.rs | 2 + 7 files changed, 90 insertions(+), 10 deletions(-) create mode 100644 swiftide/src/persist/memory_storage.rs create mode 100644 swiftide/src/persist/mod.rs diff --git a/Cargo.lock b/Cargo.lock index 0fc20fcb..e9f34867 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1103,6 +1103,7 @@ name = "examples" version = "0.0.0" dependencies = [ "serde_json", + "spider", "swiftide", "tokio", "tracing-subscriber", diff --git a/examples/Cargo.toml b/examples/Cargo.toml index 17d902cf..6fe1251a 100644 --- a/examples/Cargo.toml +++ b/examples/Cargo.toml @@ -6,9 +6,10 @@ edition = "2021" [dev-dependencies] tokio = { version = "1.0", features = ["full"] } -swiftide = { path = "../swiftide/", features = ["all"] } +swiftide = { path = "../swiftide/", features = ["all", "scraping"] } tracing-subscriber = "0.3" serde_json = "1.0" +spider = "1.98" [[example]] name = "ingest-codebase" @@ -25,3 +26,7 @@ path = "ingest_into_redis.rs" [[example]] name = "ingest-markdown-metadata" path = "ingest_markdown_lots_of_metadata.rs" + +[[example]] +name = "scraping-ingest" +path = "scraping_ingest_to_markdown.rs" diff --git a/swiftide/src/integrations/scraping/loader.rs b/swiftide/src/integrations/scraping/loader.rs index 6b837981..bc30cf49 100644 --- a/swiftide/src/integrations/scraping/loader.rs +++ b/swiftide/src/integrations/scraping/loader.rs @@ -1,6 +1,6 @@ use derive_builder::Builder; use spider::website::Website; -use tokio::sync::RwLock; +use tokio::{runtime::Handle, sync::RwLock}; use crate::{ ingestion::{IngestionNode, IngestionStream}, @@ -18,8 +18,12 @@ pub struct ScrapingLoader { } impl ScrapingLoader { + pub fn builder() -> ScrapingLoaderBuilder { + ScrapingLoaderBuilder::default() + } + // Constructs a scrapingloader from a `spider::Website` configuration - #![allow(dead_code)] + #[allow(dead_code)] pub fn from_spider(spider_website: Website) -> Self { Self { spider_website: RwLock::new(spider_website), @@ -35,13 +39,17 @@ impl ScrapingLoader { impl Loader for ScrapingLoader { fn into_stream(self) -> IngestionStream { let (tx, rx) = std::sync::mpsc::channel(); - let mut spider_rx = self - .spider_website - .blocking_write() - .subscribe(0) - .expect("Failed to subscribe to spider"); + let mut spider_rx = tokio::task::block_in_place(|| { + Handle::current().block_on(async { + self.spider_website + .write() + .await + .subscribe(0) + .expect("Failed to subscribe to spider") + }) + }); - let _recv_thread = std::thread::spawn(|| async move { + let _recv_thread = tokio::spawn(async move { while let Ok(res) = spider_rx.recv().await { let node = IngestionNode { chunk: res.get_html(), @@ -56,7 +64,7 @@ impl Loader for ScrapingLoader { } }); - let _scrape_thread = std::thread::spawn(|| async move { + let _scrape_thread = tokio::spawn(async move { let mut spider_website = self.spider_website.write().await; spider_website.scrape().await; }); diff --git a/swiftide/src/integrations/scraping/mod.rs b/swiftide/src/integrations/scraping/mod.rs index c0c23ab3..03e4aa7e 100644 --- a/swiftide/src/integrations/scraping/mod.rs +++ b/swiftide/src/integrations/scraping/mod.rs @@ -1,2 +1,5 @@ mod html_to_markdown_transformer; mod loader; + +pub use html_to_markdown_transformer::HtmlToMarkdownTransformer; +pub use loader::ScrapingLoader; diff --git a/swiftide/src/lib.rs b/swiftide/src/lib.rs index cf728364..1e726f51 100644 --- a/swiftide/src/lib.rs +++ b/swiftide/src/lib.rs @@ -16,6 +16,7 @@ pub mod ingestion; pub mod integrations; pub mod loaders; +pub mod persist; pub mod traits; pub mod transformers; pub mod type_aliases; diff --git a/swiftide/src/persist/memory_storage.rs b/swiftide/src/persist/memory_storage.rs new file mode 100644 index 00000000..55277d96 --- /dev/null +++ b/swiftide/src/persist/memory_storage.rs @@ -0,0 +1,60 @@ +use std::collections::HashMap; + +use anyhow::Result; +use async_trait::async_trait; +use derive_builder::Builder; +use tokio::sync::RwLock; + +use crate::{ + ingestion::{IngestionNode, IngestionStream}, + Persist, +}; + +#[derive(Debug, Default, Builder)] +#[builder(pattern = "owned")] +/// A simple in-memory storage implementation. +/// +/// Great for experimentation and testing. +pub struct MemoryStorage { + data: RwLock>, + #[builder(default)] + batch_size: Option, +} + +impl MemoryStorage { + fn key(&self, node: &IngestionNode) -> String { + node.path.clone().to_string_lossy().to_string() + } + + #[allow(dead_code)] + async fn get(&self, key: &str) -> Option { + self.data.read().await.get(key).cloned() + } +} + +#[async_trait] +impl Persist for MemoryStorage { + async fn setup(&self) -> Result<()> { + Ok(()) + } + + async fn store(&self, node: IngestionNode) -> Result { + self.data + .write() + .await + .insert(self.key(&node), node.clone()); + Ok(node) + } + + async fn batch_store(&self, nodes: Vec) -> IngestionStream { + let mut lock = self.data.write().await; + for node in &nodes { + lock.insert(self.key(node), node.clone()); + } + IngestionStream::iter(nodes.into_iter().map(Ok)) + } + + fn batch_size(&self) -> Option { + self.batch_size + } +} diff --git a/swiftide/src/persist/mod.rs b/swiftide/src/persist/mod.rs new file mode 100644 index 00000000..73ead84a --- /dev/null +++ b/swiftide/src/persist/mod.rs @@ -0,0 +1,2 @@ +mod memory_storage; +pub use memory_storage::MemoryStorage;