From 50f4e15901cdaa59ad6489f1edfd37f1ee6da298 Mon Sep 17 00:00:00 2001 From: Vince Juliano Date: Tue, 15 Oct 2024 14:09:56 -0400 Subject: [PATCH] chore(su): readme change for local store #987 --- servers/su/README.md | 6 ++++ .../src/domain/clients/local_store/store.rs | 28 ++++++++----------- .../src/domain/clients/local_store/tests.rs | 1 - servers/su/src/domain/clients/store.rs | 5 +++- servers/su/src/domain/core/dal.rs | 5 +++- servers/su/src/domain/core/scheduler.rs | 2 +- 6 files changed, 27 insertions(+), 20 deletions(-) diff --git a/servers/su/README.md b/servers/su/README.md index f8f234e02..620ff103f 100644 --- a/servers/su/README.md +++ b/servers/su/README.md @@ -55,6 +55,12 @@ Create a .env file with the following variables, or set them in the OS: - `ENABLE_PROCESS_ASSIGNMENT` enables AOP-6 boot loader, if enabled, the Process on a new spawn will become the first Message/Nonce in its message list. It will get an Assignment. - `ARWEAVE_URL_LIST` list of arweave urls that have tx access aka url/txid returns the tx. Used by gateway calls for checking transactions etc... +## Experimental environment variables +To use the expirimental fully local storage system set the following evnironment variables. +- `USE_LOCAL_STORE` if true the SU will operate on purely RocksDB +- `SU_FILE_DB_DIR` a local RocksDB directory of bundles +- `SU_INDEX_DB_DIR` a local index of processes and messages + > You can also use a `.env` file to set environment variables when running in > development mode, See the `.env.example` for an example `.env` diff --git a/servers/su/src/domain/clients/local_store/store.rs b/servers/su/src/domain/clients/local_store/store.rs index 20d0facdc..ae598a5f6 100644 --- a/servers/su/src/domain/clients/local_store/store.rs +++ b/servers/su/src/domain/clients/local_store/store.rs @@ -1,7 +1,6 @@ use std::sync::Arc; use async_trait::async_trait; -use dashmap::DashMap; use rocksdb::{Options, DB}; use tokio::time::{sleep, Duration}; @@ -27,11 +26,6 @@ pub struct LocalStoreClient { and Messages, only public for migration purposes */ pub index_db: DB, - /* - A cache of the latest messages for a process based - on the last from parameter that has been queried - */ - pub index_cache: DashMap> } impl From for StoreErrorType { @@ -68,13 +62,10 @@ impl LocalStoreClient { Err(e) => panic!("failed to open cf with options: {}", e), }; - let index_cache = DashMap::new(); - Ok(LocalStoreClient { _logger: logger, file_db, index_db, - index_cache }) } @@ -176,7 +167,7 @@ impl LocalStoreClient { for querying message ranges for the /processid message list */ - fn fetch_message_range( + async fn fetch_message_range( &self, process_id: &String, from: &Option, @@ -235,7 +226,7 @@ impl LocalStoreClient { } } - paginated_keys.push((key_str, assignment_id)); + paginated_keys.push((key_str.clone(), assignment_id)); count += 1; match limit { @@ -450,8 +441,9 @@ impl DataStore for LocalStoreClient { actual_limit -= 1; } - let (paginated_keys, has_next_page) = - self.fetch_message_range(process_id, from, to, &Some(actual_limit))?; + let (paginated_keys, has_next_page) = self + .fetch_message_range(process_id, from, to, &Some(actual_limit)) + .await?; /* Fetch the messages for each paginated key. This @@ -482,9 +474,13 @@ impl DataStore for LocalStoreClient { were pulling all the message keys into memory and picking the latest one. */ - fn get_latest_message(&self, process_id: &str) -> Result, StoreErrorType> { - let (paginated_keys, _) = - self.fetch_message_range(&process_id.to_string(), &None, &None, &None)?; + async fn get_latest_message( + &self, + process_id: &str, + ) -> Result, StoreErrorType> { + let (paginated_keys, _) = self + .fetch_message_range(&process_id.to_string(), &None, &None, &None) + .await?; if paginated_keys.len() < 1 { return Ok(None); diff --git a/servers/su/src/domain/clients/local_store/tests.rs b/servers/su/src/domain/clients/local_store/tests.rs index 83ae3ac6d..a4dfbd51d 100644 --- a/servers/su/src/domain/clients/local_store/tests.rs +++ b/servers/su/src/domain/clients/local_store/tests.rs @@ -120,7 +120,6 @@ mod tests { let (process_bundle, message_bundles) = bundle_list(); let test_process = Process::from_bytes(process_bundle.clone())?; - println!("{:?}", test_process); client.save_process(&test_process, &process_bundle)?; for bundle in message_bundles.iter() { diff --git a/servers/su/src/domain/clients/store.rs b/servers/su/src/domain/clients/store.rs index 4d78da2fd..c4b8505de 100644 --- a/servers/su/src/domain/clients/store.rs +++ b/servers/su/src/domain/clients/store.rs @@ -1058,7 +1058,10 @@ impl DataStore for StoreClient { } } - fn get_latest_message(&self, process_id_in: &str) -> Result, StoreErrorType> { + async fn get_latest_message( + &self, + process_id_in: &str, + ) -> Result, StoreErrorType> { use super::schema::messages::dsl::*; /* This must use get_conn because it needs diff --git a/servers/su/src/domain/core/dal.rs b/servers/su/src/domain/core/dal.rs index 41cbfebd9..950d8cb31 100644 --- a/servers/su/src/domain/core/dal.rs +++ b/servers/su/src/domain/core/dal.rs @@ -130,7 +130,10 @@ pub trait DataStore: Send + Sync { limit: &Option, ) -> Result; fn get_message(&self, message_id_in: &str) -> Result; - fn get_latest_message(&self, process_id_in: &str) -> Result, StoreErrorType>; + async fn get_latest_message( + &self, + process_id_in: &str, + ) -> Result, StoreErrorType>; fn check_existing_message(&self, message_id: &String) -> Result<(), StoreErrorType>; } diff --git a/servers/su/src/domain/core/scheduler.rs b/servers/su/src/domain/core/scheduler.rs index 09d963a03..f12bd3c70 100644 --- a/servers/su/src/domain/core/scheduler.rs +++ b/servers/su/src/domain/core/scheduler.rs @@ -139,7 +139,7 @@ impl ProcessScheduler { (cached_info.schedule_info.epoch, new_nonce, new_hash_chain) } else { - let latest_message = match self.deps.data_store.get_latest_message(&id) { + let latest_message = match self.deps.data_store.get_latest_message(&id).await { Ok(m) => m, Err(e) => return Err(format!("{:?}", e)), };