From 0a5fbead1fe5c47e597d4e0ba2f10fff4a103708 Mon Sep 17 00:00:00 2001 From: Paul Masurel Date: Thu, 5 Sep 2024 11:29:33 +0900 Subject: [PATCH] introducign asymmetric reader/writer --- .../file_backed/file_backed_index/mod.rs | 20 - .../file_backed/lazy_file_backed_index.rs | 41 +- .../src/metastore/file_backed/mod.rs | 431 ++++++++---------- 3 files changed, 212 insertions(+), 280 deletions(-) diff --git a/quickwit/quickwit-metastore/src/metastore/file_backed/file_backed_index/mod.rs b/quickwit/quickwit-metastore/src/metastore/file_backed/file_backed_index/mod.rs index 683692bc5e6..35dde211fbb 100644 --- a/quickwit/quickwit-metastore/src/metastore/file_backed/file_backed_index/mod.rs +++ b/quickwit/quickwit-metastore/src/metastore/file_backed/file_backed_index/mod.rs @@ -66,14 +66,6 @@ pub(crate) struct FileBackedIndex { delete_tasks: Vec, /// Stamper. stamper: Stamper, - /// Flag used to avoid polling the metastore if - /// the process is actually writing the metastore. - /// - /// The logic is "soft". We avoid the polling step - /// if the metastore wrote some value since the last - /// polling loop. - recently_modified: bool, - } #[cfg(any(test, feature = "testsuite"))] @@ -150,7 +142,6 @@ impl From for FileBackedIndex { per_source_shards, delete_tasks: Default::default(), stamper: Default::default(), - recently_modified: false, } } } @@ -185,20 +176,9 @@ impl FileBackedIndex { per_source_shards, delete_tasks, stamper: Stamper::new(last_opstamp), - recently_modified: false, } } - /// Sets the `recently_modified` flag to false and returns the previous value. - pub fn flip_recently_modified_down(&mut self) -> bool { - std::mem::replace(&mut self.recently_modified, false) - } - - /// Marks the file as `recently_modified`. - pub fn set_recently_modified(&mut self) { - self.recently_modified = true; - } - /// Index ID accessor. pub fn index_id(&self) -> &str { self.metadata.index_id() diff --git a/quickwit/quickwit-metastore/src/metastore/file_backed/lazy_file_backed_index.rs b/quickwit/quickwit-metastore/src/metastore/file_backed/lazy_file_backed_index.rs index 55c8f571241..0602c0386f8 100644 --- a/quickwit/quickwit-metastore/src/metastore/file_backed/lazy_file_backed_index.rs +++ b/quickwit/quickwit-metastore/src/metastore/file_backed/lazy_file_backed_index.rs @@ -24,12 +24,11 @@ use quickwit_proto::metastore::{EntityKind, MetastoreError, MetastoreResult}; use quickwit_proto::types::IndexId; use quickwit_storage::Storage; use tokio::sync::{Mutex, OnceCell}; -use tokio::time::Instant; use tracing::error; use super::file_backed_index::FileBackedIndex; use super::store_operations::{load_index, METASTORE_FILE_NAME}; -use super::FileBackedIndexCell; +use super::{FileBackedIndexCell, FileBackedIndexWriter}; /// Lazy [`FileBackedIndex`]. It loads a `FileBackedIndex` on demand. When the index is first /// loaded, it optionally spawns a task to periodically poll the storage and update the index. @@ -37,7 +36,7 @@ pub(crate) struct LazyFileBackedIndex { index_id: IndexId, storage: Arc, polling_interval_opt: Option, - lazy_index: OnceCell>>, + lazy_index: OnceCell, } impl LazyFileBackedIndex { @@ -48,8 +47,7 @@ impl LazyFileBackedIndex { polling_interval_opt: Option, file_backed_index: Option, ) -> Self { - let index_mutex_opt = - file_backed_index.map(|index| Arc::new(Mutex::new(FileBackedIndexCell::new(index)))); + let index_mutex_opt = file_backed_index.map(FileBackedIndexCell::new); // If the polling interval is configured and the index is already loaded, // spawn immediately the polling task if let Some(index_mutex) = &index_mutex_opt { @@ -57,7 +55,7 @@ impl LazyFileBackedIndex { spawn_index_metadata_polling_task( storage.clone(), index_id.clone(), - Arc::downgrade(index_mutex), + Arc::downgrade(&index_mutex.writer), polling_interval, ); } @@ -72,23 +70,23 @@ impl LazyFileBackedIndex { /// Gets a synchronized `FileBackedIndex`. If the index wasn't provided on creation, we load it /// lazily on the first call of this method. - pub async fn get(&self) -> MetastoreResult>> { + pub(crate) async fn get(&self) -> MetastoreResult { self.lazy_index .get_or_try_init(|| async move { let index = load_index(&*self.storage, &self.index_id).await?; let file_backed_index_cell = FileBackedIndexCell::new(index); - let index_mutex = Arc::new(Mutex::new(file_backed_index_cell)); + let file_backed_index_writer = Arc::downgrade(&file_backed_index_cell.writer); // When the index is loaded lazily, the polling task is not started in the // constructor so we do it here when the index is actually loaded. if let Some(polling_interval) = self.polling_interval_opt { spawn_index_metadata_polling_task( self.storage.clone(), self.index_id.clone(), - Arc::downgrade(&index_mutex), + file_backed_index_writer, polling_interval, ); } - Ok(index_mutex) + Ok(file_backed_index_cell) }) .await .cloned() @@ -98,20 +96,22 @@ impl LazyFileBackedIndex { async fn poll_index_metadata_once( storage: &dyn Storage, index_id: &str, - index_mutex: &Mutex, + index_writer: &Mutex, ) { - todo!(); - // FIXME - /* - let mut locked_index = index_mutex.lock().await; - if locked_index.flip_recently_modified_down() { + let mut locked_index = index_writer.lock().await; + if locked_index.upload_task.is_none() { + return; + } + // TODO COol down period. + if locked_index.last_push.elapsed() < Duration::from_secs(30) { return; } let load_index_result = load_index(storage, index_id).await; match load_index_result { Ok(index) => { - *locked_index = index; + locked_index.write_state = index; + locked_index.publish(); } Err(MetastoreError::NotFound(EntityKind::Index { .. })) => { // The index has been deleted by the file-backed metastore holding a reference to this @@ -127,13 +127,12 @@ async fn poll_index_metadata_once( ); } } - */ } fn spawn_index_metadata_polling_task( storage: Arc, index_id: IndexId, - metastore_weak: Weak>, + metastore_weak: Weak>, polling_interval: Duration, ) { tokio::task::spawn(async move { @@ -141,9 +140,9 @@ fn spawn_index_metadata_polling_task( interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay); interval.tick().await; //< this is to prevent fetch right after the first population of the data. - while let Some(metadata_mutex) = metastore_weak.upgrade() { + while let Some(metadata_writer) = metastore_weak.upgrade() { interval.tick().await; - poll_index_metadata_once(&*storage, &index_id, &metadata_mutex).await; + poll_index_metadata_once(&*storage, &index_id, &*metadata_writer).await; } }); } diff --git a/quickwit/quickwit-metastore/src/metastore/file_backed/mod.rs b/quickwit/quickwit-metastore/src/metastore/file_backed/mod.rs index a28deaafa15..4a29bfedc90 100644 --- a/quickwit/quickwit-metastore/src/metastore/file_backed/mod.rs +++ b/quickwit/quickwit-metastore/src/metastore/file_backed/mod.rs @@ -31,9 +31,9 @@ pub(crate) mod manifest; mod state; mod store_operations; -use core::fmt; use std::collections::hash_map::Entry; use std::collections::HashMap; +use std::fmt; use std::path::Path; use std::sync::Arc; use std::time::Duration; @@ -67,9 +67,8 @@ use quickwit_proto::metastore::{ use quickwit_proto::types::{IndexId, IndexUid}; use quickwit_storage::Storage; use time::OffsetDateTime; -use tokio::sync::{oneshot, Mutex, Notify, OnceCell, OwnedMutexGuard, RwLock}; +use tokio::sync::{watch, Mutex, OwnedMutexGuard, RwLock}; use tokio::time::Instant; -use tracing::Instrument; use self::file_backed_index::FileBackedIndex; pub use self::file_backed_metastore_factory::FileBackedMetastoreFactory; @@ -125,38 +124,72 @@ impl Drop for UploadTask { } } -struct FileBackedIndexCell { - pub file_backed_index: FileBackedIndex, +pub(crate) struct FileBackedIndexWriter { + write_state: FileBackedIndex, + last_push: Instant, + published_view_rx: watch::Receiver>>, + published_view_tx: watch::Sender>>, + upload_task: Option, + discarded: bool, +} + +impl FileBackedIndexWriter { + pub async fn rollback(&mut self) { + drop(self.upload_task.take()); + self.discarded = false; + if let Some(rollback_view) = self.published_view_rx.borrow().clone() { + self.write_state = (*rollback_view).clone(); + } + } + + /// Marks the index writer as discarded. + pub fn discard(&mut self) { + self.discarded = false; + let _ = self.published_view_tx.send(None); + } + + /// Publish the `write` version: the read view will be updated to the current write version. + /// + /// Note this method itself does NOT perform the push. + pub fn publish(&mut self) { + let _ = self + .published_view_tx + .send(Some(Arc::new(self.write_state.clone()))); + self.discarded = false; + self.last_push = Instant::now(); + } +} + +#[derive(Clone)] +pub(crate) struct FileBackedIndexCell { + pub published_view_rx: watch::Receiver>>, /// The last time an index was modified and pushed. /// This instant is to ensure we enforce a cooldown between 2 /// push requests. - pub last_update: Instant, - pub write_state: FileBackedIndex, - upload_task: Option, - - /// Has been discarded. This field exists to make - /// it possible to discard this entry if there is an error - /// while mutating the Index. - // TODO move this logic to the cell. - pub discarded: bool, + pub(crate) writer: Arc>, } impl FileBackedIndexCell { pub fn new(file_backed_index: FileBackedIndex) -> Self { - FileBackedIndexCell { - file_backed_index: file_backed_index.clone(), - last_update: Instant::now(), + let (published_view_tx, published_view_rx) = + watch::channel(Some(Arc::new(file_backed_index.clone()))); + let writer = FileBackedIndexWriter { write_state: file_backed_index, + last_push: Instant::now(), + published_view_tx, + published_view_rx: published_view_rx.clone(), upload_task: None, discarded: false, + }; + FileBackedIndexCell { + published_view_rx, + writer: Arc::new(Mutex::new(writer)), } } - pub async fn cancel_scheduled_write(&mut self) { - drop(self.upload_task.take()); - // better message FIXME. - // TODO check the property described in the expect. + pub fn is_discarded(&self) -> bool { + self.published_view_rx.borrow().is_none() } } @@ -210,7 +243,7 @@ impl fmt::Debug for FileBackedMetastore { } } -const COOLDOWN: Duration = Duration::from_secs(1); +const METASTORE_PUSH_COOLDOWN: Duration = Duration::from_millis(1_200); impl FileBackedMetastore { /// Creates a [`FileBackedMetastore`] for tests. @@ -252,36 +285,30 @@ impl FileBackedMetastore { Ok(metastore) } - /// - cancel - /// - return - /// - wait - /// - schedule now - /// - schedule later - /// Mutates an index state. + /// + /// If the mutation is successful, this function returns the mutated version of the index right + /// after the update. async fn mutate( &self, index_uid: &IndexUid, mutate_fn: impl FnOnce(&mut FileBackedIndex) -> MetastoreResult>, ) -> MetastoreResult { let index_id = &index_uid.index_id; - let mut locked_index_cell = self.get_locked_index(index_id).await?; - let index_lock = self.index(index_id).await?; + let mut locked_index_writer = self.get_locked_index(index_id).await?; - if locked_index_cell.file_backed_index.index_uid() != index_uid { + if locked_index_writer.write_state.index_uid() != index_uid { return Err(MetastoreError::NotFound(EntityKind::Index { index_id: index_id.to_string(), })); } - let write_state: &mut FileBackedIndex = &mut locked_index_cell.write_state; - - let mut trigger_eventual_sync = false; + let write_state: &mut FileBackedIndex = &mut locked_index_writer.write_state; let value: T = match mutate_fn(write_state) { Ok(MutationOccurred::Yes(value)) => value, Ok(MutationOccurred::No(value)) => { - if locked_index_cell.upload_task.is_none() { + if locked_index_writer.upload_task.is_none() { // The write state has not been modified yes. We can simply return right away. return Ok(value); } @@ -289,46 +316,23 @@ impl FileBackedMetastore { } Err(metastore_error) => { // TODO mark the batch as failed to prevent the schedule batch to be written. - locked_index_cell.cancel_scheduled_write().await; + locked_index_writer.rollback().await; return Err(metastore_error); } }; - let storage: Arc = self.storage.clone(); - - if locked_index_cell.upload_task.is_none() { - let index_lock = self.index(index_id).await?; - let (tx, rx) = broadcast_oneshot_channel::>(); - let elapsed_since_last_update: Duration = locked_index_cell.last_update.elapsed(); + if locked_index_writer.upload_task.is_none() { + let elapsed_since_last_update: Duration = locked_index_writer.last_push.elapsed(); let remaining_until_end_of_cooldown_opt = - COOLDOWN.checked_sub(elapsed_since_last_update); - let task_handle = tokio::task::spawn(async move { - if let Some(remaining_until_end_of_cooldown) = remaining_until_end_of_cooldown_opt { - tokio::time::sleep(remaining_until_end_of_cooldown).await; - } - - let mut locked_index = index_lock.lock().await; - let put_index_result = put_index(&*storage, &locked_index.write_state).await; - - if put_index_result.is_err() { - // Depending on the error, the write may have been successful. We mark the current - // index as discarded to make sure we reload the data from the metastore before using it. - locked_index.discarded = true; - } else { - locked_index.file_backed_index = locked_index.write_state.clone(); - } - tx.send(put_index_result); - - locked_index.last_update = Instant::now(); - locked_index.upload_task = None; - }); - locked_index_cell.upload_task = Some(UploadTask { - upload_result: rx, - task_handle, - }); + METASTORE_PUSH_COOLDOWN.checked_sub(elapsed_since_last_update); + let upload_task = self + .schedule_push_index(index_id, remaining_until_end_of_cooldown_opt) + .await?; + locked_index_writer.last_push = Instant::now(); + locked_index_writer.upload_task = Some(upload_task); } - let rx = locked_index_cell + let rx = locked_index_writer .upload_task .as_ref() .unwrap() @@ -339,118 +343,46 @@ impl FileBackedMetastore { .map_err(|_cancelled| MetastoreError::Internal { message: "".to_string(), cause: "".to_string(), - })?; + })??; Ok(value) - - // if trigger_eventual_sync { - // self.schedule_eventual_index_write(); - // } - - // let (value, publish_notify) = if let Some(write_batch_state) = - // locked_index.write_batch_state.as_mut() { // There is some ongoing batch. - // // We just add our mutation to the batch. - // let value = match mutate_fn(&mut write_batch_state.file_backed_index) { - // Ok(MutationOccurred::Yes(value)) | Ok(MutationOccurred::No(value)) => { - // value - // } - // Err(_metastore_error) => { - // // Mark the batch as failed. - // todo!(); - // } - // }; - // let publish_notify = write_batch_state.publish_notify.clone(); - // (value, publish_notify) - // } else { - // // No ongoing batch. We need to start a new batch. - // let write_batch_state = WriteBatchState { - // file_backed_index: locked_index.file_backed_index.clone(), - // publish_notify: Arc::new(Notify::new()), - // }; - // let value = match mutate_fn(&mut write_batch_state.file_backed_index) { - // Ok(MutationOccurred::Yes(value)) | Ok(MutationOccurred::No(value)) => { - // value - // } - // Err(_metastore_error) => { - // // Mark the batch as failed. - // todo!(); - // } - // }; - // let publish_notify = write_batch_state.publish_notify.clone(); - // (value, publish_notify) - // }; - - // let value = match mutate_fn(&mut write_batch_state.file_backed_index) { - // Ok(MutationOccurred::Yes(value)) | Ok(MutationOccurred::No(value)) => { - // return Ok(value); - // } - // Err(_metastore_error) => { - // // Mark the batch as failed. - // todo!(); - // } - // }; - // locked_index.write_batch_state = Some(write_batch_state); - // } - - // let mut index = locked_index.file_backed_index.clone(); - // let value = match mutate_fn(&mut index)? { - // MutationOccurred::Yes(value) => value, - // MutationOccurred::No(value) => { - // return Ok(value); - // } - // }; - // locked_index.set_recently_modified(); - - // let put_result = put_index(&*self.storage, &index).await; - // match put_result { - // Ok(()) => { - // *locked_index = index; - // Ok(value) - // } - // Err(error) => { - // // For some of the error type here, we cannot know for sure - // // whether the content was written or not. - // // - // // Just to be sure, let's discard the cache. - // let mut state_wlock_guard = self.state.write().await; - - // // At this point, we hold both locks. - // state_wlock_guard.indexes.insert( - // index_id.to_string(), - // LazyIndexStatus::Active(LazyFileBackedIndex::new( - // self.storage.clone(), - // index_id.to_string(), - // self.polling_interval_opt, - // None, - // )), - // ); - // locked_index.discarded = true; - // Err(error) - // } - // } - } - - async fn read(&self, index_uid: &IndexUid, view: F) -> MetastoreResult - where F: FnOnce(&FileBackedIndex) -> MetastoreResult { - let index_id = &index_uid.index_id; - let locked_index = self.get_locked_index(index_id).await?; - if locked_index.file_backed_index.index_uid() == index_uid { - view(&locked_index.file_backed_index) - } else { - Err(MetastoreError::NotFound(EntityKind::Index { - index_id: index_id.to_string(), - })) - } } - async fn read_any(&self, index_id: &str, view: F) -> MetastoreResult - where F: FnOnce(&FileBackedIndex) -> MetastoreResult { - let locked_index = self.get_locked_index(index_id).await?; - view(&locked_index.file_backed_index) - } + // Schedules a new push of the index after `after_duration_opt`. + async fn schedule_push_index( + &self, + index_id: &str, + after_duration_opt: Option, + ) -> MetastoreResult { + let storage: Arc = self.storage.clone(); + let index_lock = self.index_cell(index_id).await?.writer.clone(); + let (tx, rx) = broadcast_oneshot_channel::>(); - async fn push(&self, index_id: &str) -> MetastoreResult<()> { - todo!(); + let task_handle = tokio::task::spawn(async move { + if let Some(remaining_until_end_of_cooldown) = after_duration_opt { + tokio::time::sleep(remaining_until_end_of_cooldown).await; + } + + let mut locked_index = index_lock.lock().await; + let put_index_result = put_index(&*storage, &locked_index.write_state).await; + + if put_index_result.is_err() { + // Depending on the error, the write may have been successful. We mark the current + // index as discarded to make sure we reload the data from the metastore before + // using it. + locked_index.discard(); + } else { + locked_index.publish(); + } + tx.send(put_index_result); + + locked_index.last_push = Instant::now(); + locked_index.upload_task = None; + }); + Ok(UploadTask { + upload_result: rx, + task_handle, + }) } /// Returns a valid locked index. @@ -460,16 +392,45 @@ impl FileBackedMetastore { async fn get_locked_index( &self, index_id: &str, - ) -> MetastoreResult> { + ) -> MetastoreResult> { loop { - let index = self.index(index_id).await?; - let locked_index = index.lock_owned().await; + let index = self.index_cell(index_id).await?; + let locked_index = index.writer.lock_owned().await; if !locked_index.discarded { return Ok(locked_index); } } } + /// Returns a valid locked index. + /// + /// This function guarantees that it has not been + /// marked as discarded. + async fn get_view_from_id(&self, index_id: &str) -> MetastoreResult> { + loop { + let index = self.index_cell(index_id).await?; + let view_opt = index.published_view_rx.borrow().clone(); + if let Some(view) = view_opt { + return Ok(view); + } + } + } + + async fn get_view_from_index_uid( + &self, + index_uid: &IndexUid, + ) -> MetastoreResult> { + let index_id = &index_uid.index_id; + let index: Arc = self.get_view_from_id(index_id).await?; + if index.index_uid() == index_uid { + Ok(index) + } else { + Err(MetastoreError::NotFound(EntityKind::Index { + index_id: index_id.to_string(), + })) + } + } + /// Returns a FileBackedIndex for the given index_id. /// /// If `index_id` is in a transitioning state `Creating` or `Deleting`, it will @@ -481,19 +442,19 @@ impl FileBackedMetastore { /// /// If an index is stored in the metastore state but is marked as discarded, /// it is not returned, and it is treated as if it was not there in the first place. - async fn index(&self, index_id: &str) -> MetastoreResult>> { + async fn index_cell(&self, index_id: &str) -> MetastoreResult { { // Happy path! // If the object is already in our cache then we just return a copy { let inner_rlock_guard = self.state.read().await; if let Some(index_state) = inner_rlock_guard.indexes.get(index_id) { - let index_mutex = get_index_mutex(index_id, index_state).await?; + let index_cell = get_index_cell(index_id, index_state).await?; + let index_discarded = index_cell.is_discarded(); // TODO this could be a long lock to acquire. // We need to find a way to work around this. - let index_discarded = index_mutex.lock().await.discarded; if !index_discarded { - return Ok(index_mutex); + return Ok(index_cell); } } } @@ -514,10 +475,10 @@ impl FileBackedMetastore { // the map. We want to avoid two copies to exist in the application, so we keep only // one. if let Some(index_state) = state_wlock_guard.indexes.get(index_id) { - let index_mutex = get_index_mutex(index_id, index_state).await?; - let discarded = index_mutex.lock().await.discarded; + let index_cell = get_index_cell(index_id, index_state).await?; + let discarded = index_cell.is_discarded(); if !discarded { - return Ok(index_mutex); + return Ok(index_cell); } } @@ -552,17 +513,16 @@ impl FileBackedMetastore { message, cause: "".to_string(), }; - return Err((metastore_error, index_id_opt, index_uid_opt)); + return Err((metastore_error, index_id_opt.clone(), index_uid_opt)); }; - let index_metadata = match self - .read_any(index_id, |index| Ok(index.metadata().clone())) + let index_view = self + .get_view_from_id(index_id) .await - { - Ok(index_metadata) => index_metadata, - Err(metastore_error) => { - return Err((metastore_error, index_id_opt, index_uid_opt)); - } - }; + .map_err(|metastore_err| { + (metastore_err, index_id_opt.clone(), index_uid_opt.clone()) + })?; + let index_metadata = index_view.metadata().clone(); + if let Some(index_uid) = &index_uid_opt { if index_metadata.index_uid != *index_uid { let metastore_error = MetastoreError::NotFound(EntityKind::Index { @@ -580,10 +540,9 @@ impl FileBackedMetastore { let list_splits_query = request.deserialize_list_splits_query()?; let mut all_splits = Vec::new(); for index_uid in &list_splits_query.index_uids { - let splits = match self - .read(index_uid, |index| index.list_splits(&list_splits_query)) - .await - { + let index_res = self.get_view_from_index_uid(index_uid).await; + let splits_res = index_res.and_then(|index| index.list_splits(&list_splits_query)); + let splits = match splits_res { Ok(splits) => splits, Err(MetastoreError::NotFound(_)) => { // If the index does not exist, we just skip it. @@ -595,12 +554,6 @@ impl FileBackedMetastore { } Ok(all_splits) } - - /// Helper used for testing to obtain the data associated with the given index. - #[cfg(test)] - async fn get_index(&self, index_uid: &IndexUid) -> MetastoreResult { - self.read(index_uid, |index| Ok(index.clone())).await - } } #[async_trait] @@ -1092,12 +1045,10 @@ impl MetastoreService for FileBackedMetastore { async fn list_shards(&self, request: ListShardsRequest) -> MetastoreResult { let mut subresponses = Vec::with_capacity(request.subrequests.len()); - for subrequest in request.subrequests { - let index_uid = subrequest.index_uid().clone(); - let subresponse = self - .read(&index_uid, |index| index.list_shards(subrequest)) - .await?; + let index_uid = subrequest.index_uid(); + let index_view = self.get_view_from_index_uid(&index_uid).await?; + let subresponse = index_view.list_shards(subrequest)?; subresponses.push(subresponse); } let response = ListShardsResponse { subresponses }; @@ -1111,9 +1062,8 @@ impl MetastoreService for FileBackedMetastore { &self, request: LastDeleteOpstampRequest, ) -> MetastoreResult { - let last_delete_opstamp = self - .read(request.index_uid(), |index| Ok(index.last_delete_opstamp())) - .await?; + let index_view = self.get_view_from_index_uid(request.index_uid()).await?; + let last_delete_opstamp = index_view.last_delete_opstamp(); Ok(LastDeleteOpstampResponse::new(last_delete_opstamp)) } @@ -1154,14 +1104,9 @@ impl MetastoreService for FileBackedMetastore { request: ListDeleteTasksRequest, ) -> MetastoreResult { let index_uid = request.index_uid(); - - let delete_tasks = self - .read(index_uid, |index| { - Ok(index.list_delete_tasks(request.opstamp_start)) - }) - .await??; - let response = ListDeleteTasksResponse { delete_tasks }; - Ok(response) + let index_view = self.get_view_from_index_uid(index_uid).await?; + let delete_tasks = index_view.list_delete_tasks(request.opstamp_start)?; + Ok(ListDeleteTasksResponse { delete_tasks }) } // Index Template API @@ -1326,10 +1271,10 @@ impl MetastoreService for FileBackedMetastore { impl MetastoreServiceExt for FileBackedMetastore {} -async fn get_index_mutex( +async fn get_index_cell( index_id: &str, lazy_index_status: &LazyIndexStatus, -) -> MetastoreResult>> { +) -> MetastoreResult { match lazy_index_status { LazyIndexStatus::Active(lazy_index) => lazy_index.get().await, LazyIndexStatus::Creating => Err(MetastoreError::Internal { @@ -1427,12 +1372,8 @@ mod tests { } async fn list_all_shards(&self, index_uid: &IndexUid, source_id: &SourceId) -> Vec { - self.read(index_uid, |index| { - let shards = index.list_all_shards(source_id); - Ok(shards) - }) - .await - .unwrap() + let index_view = self.get_view_from_index_uid(index_uid).await.unwrap(); + index_view.list_all_shards(source_id) } } @@ -1499,7 +1440,7 @@ mod tests { .clone(); // Open index and check its metadata - let created_index = metastore.get_index(&index_uid).await.unwrap(); + let created_index = metastore.get_view_from_index_uid(&index_uid).await.unwrap(); assert_eq!(created_index.index_id(), index_config.index_id); assert_eq!( created_index.metadata().index_uri(), @@ -1518,14 +1459,14 @@ mod tests { // Open a non-existent index. let metastore_error = metastore - .get_index(&IndexUid::new_with_random_ulid("index-does-not-exist")) + .get_view_from_index_uid(&IndexUid::new_with_random_ulid("index-does-not-exist")) .await .unwrap_err(); assert!(matches!(metastore_error, MetastoreError::NotFound { .. })); // Open a index with a different incarnation_id. let metastore_error = metastore - .get_index(&IndexUid::new_with_random_ulid(index_id)) + .get_view_from_index_uid(&IndexUid::new_with_random_ulid(index_id)) .await .unwrap_err(); assert!(matches!(metastore_error, MetastoreError::NotFound { .. })); @@ -1649,7 +1590,7 @@ mod tests { // Getting index with inconsistent index ID should raise an error. let metastore_error = metastore - .get_index(&IndexUid::new_with_random_ulid(index_id)) + .get_view_from_index_uid(&IndexUid::new_with_random_ulid(index_id)) .await .unwrap_err(); assert!(matches!(metastore_error, MetastoreError::Internal { .. })); @@ -1938,7 +1879,7 @@ mod tests { assert!(matches!(metastore_error, MetastoreError::Internal { .. })); // Try fetch the not created index. let created_index_error = metastore - .get_index(&IndexUid::new_with_random_ulid(index_id)) + .get_view_from_index_uid(&IndexUid::new_with_random_ulid(index_id)) .await .unwrap_err(); assert!(matches!( @@ -1989,7 +1930,10 @@ mod tests { assert!(matches!(metastore_error, MetastoreError::Internal { .. })); // Let's fetch the index, we expect an internal error as the index state is in `Creating` // state. - let created_index_error = metastore.get_index(&index_uid.clone()).await.unwrap_err(); + let created_index_error = metastore + .get_view_from_index_uid(&index_uid.clone()) + .await + .unwrap_err(); assert!(matches!( created_index_error, MetastoreError::Internal { .. } @@ -2013,7 +1957,10 @@ mod tests { let manifest = load_or_create_manifest(&*storage).await.unwrap(); assert!(!manifest.indexes.contains_key(index_id)); // Now we can expect an `IndexDoesNotExist` error. - let created_index_error = metastore.get_index(&index_uid).await.unwrap_err(); + let created_index_error = metastore + .get_view_from_index_uid(&index_uid) + .await + .unwrap_err(); assert!(matches!( created_index_error, MetastoreError::NotFound { .. } @@ -2064,7 +2011,7 @@ mod tests { // Let's fetch the index, we expect an internal error as the index state is in `Creating` // state. let created_index_error = metastore - .get_index(&IndexUid::new_with_random_ulid(index_id)) + .get_view_from_index_uid(&IndexUid::new_with_random_ulid(index_id)) .await .unwrap_err(); assert!(matches!( @@ -2115,7 +2062,10 @@ mod tests { assert!(matches!(metastore_error, MetastoreError::Internal { .. })); // Let's fetch the index, we expect an internal error as the index state is in `Deleting` // state. - let created_index_error = metastore.get_index(&index_uid).await.unwrap_err(); + let created_index_error = metastore + .get_view_from_index_uid(&index_uid) + .await + .unwrap_err(); assert!(matches!( created_index_error, MetastoreError::Internal { .. } @@ -2171,7 +2121,10 @@ mod tests { assert!(matches!(metastore_error, MetastoreError::Internal { .. })); // Let's fetch the index, we expect an internal error as the index state is in `Deleting` // state. - let created_index_error = metastore.get_index(&index_uid).await.unwrap_err(); + let created_index_error = metastore + .get_view_from_index_uid(&index_uid) + .await + .unwrap_err(); assert!(matches!( created_index_error, MetastoreError::Internal { .. } @@ -2235,7 +2188,7 @@ mod tests { // Fetch the index metadata not registered in index states json. metastore - .get_index(&index_uid_unregistered.clone()) + .get_view_from_index_uid(&index_uid_unregistered.clone()) .await .unwrap();