Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Debounced file based metastore write #5388

Draft
wants to merge 8 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
// Copyright (C) 2024 Quickwit, Inc.
//
// Quickwit is offered under the AGPL v3.0 and as commercial software.
// For commercial licensing, contact us at [email protected].
//
// AGPL:
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as
// published by the Free Software Foundation, either version 3 of the
// License, or (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.

use tokio::sync::watch;

pub struct Sender<T> {
watch_tx: watch::Sender<Option<T>>,
}

impl<T> Sender<T> {
pub fn send(&self, obj: T) {
let _ = self.watch_tx.send(Some(obj));
}
}

#[derive(Clone)]
pub struct Receiver<T> {
watch_rx: watch::Receiver<Option<T>>,
}

#[derive(Debug)]
pub struct Cancelled;

impl<T: Clone> Receiver<T> {
pub async fn receive(mut self) -> Result<T, Cancelled> {
let result_opt = self
.watch_rx
.wait_for(|result_opt| result_opt.is_some())
.await
.map_err(|_| Cancelled)?;
Ok(result_opt.clone().unwrap())
}
}

pub fn broadcast_oneshot_channel<T>() -> (Sender<T>, Receiver<T>) {
let (watch_tx, watch_rx) = watch::channel(None);
let sender = Sender { watch_tx };
let receiver = Receiver { watch_rx };
(sender, receiver)
}

#[cfg(test)]
mod tests {
use super::broadcast_oneshot_channel;

#[tokio::test]
async fn test_broadcast_oneshot_channel_rx() {
let (sender, receiver) = broadcast_oneshot_channel::<u32>();
let receiver_2 = receiver.clone();
let join_handle_rx = tokio::spawn(async move { receiver_2.receive().await.unwrap() });
sender.send(42);
assert_eq!(receiver.receive().await.unwrap(), 42);
assert_eq!(join_handle_rx.await.unwrap(), 42);
}

#[tokio::test]
async fn test_broadcast_oneshot_channel_rx_cancel() {
let (sender, receiver) = broadcast_oneshot_channel::<u32>();
let receiver_2 = receiver.clone();
let join_handle_rx = tokio::spawn(async move { receiver_2.receive().await });
drop(sender);
assert!(receiver.receive().await.is_err());
assert!(join_handle_rx.await.unwrap().is_err());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -66,17 +66,6 @@ pub(crate) struct FileBackedIndex {
delete_tasks: Vec<DeleteTask>,
/// Stamper.
stamper: Stamper,
/// Flag used to avoid polling the metastore if
/// the process is actually writing the metastore.
///
/// The logic is "soft". We avoid the polling step
/// if the metastore wrote some value since the last
/// polling loop.
recently_modified: bool,
/// Has been discarded. This field exists to make
/// it possible to discard this entry if there is an error
/// while mutating the Index.
pub discarded: bool,
}

#[cfg(any(test, feature = "testsuite"))]
Expand Down Expand Up @@ -153,8 +142,6 @@ impl From<IndexMetadata> for FileBackedIndex {
per_source_shards,
delete_tasks: Default::default(),
stamper: Default::default(),
recently_modified: false,
discarded: false,
}
}
}
Expand Down Expand Up @@ -189,21 +176,9 @@ impl FileBackedIndex {
per_source_shards,
delete_tasks,
stamper: Stamper::new(last_opstamp),
recently_modified: false,
discarded: false,
}
}

/// Sets the `recently_modified` flag to false and returns the previous value.
pub fn flip_recently_modified_down(&mut self) -> bool {
std::mem::replace(&mut self.recently_modified, false)
}

/// Marks the file as `recently_modified`.
pub fn set_recently_modified(&mut self) {
self.recently_modified = true;
}

/// Index ID accessor.
pub fn index_id(&self) -> &str {
self.metadata.index_id()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,14 +28,15 @@ use tracing::error;

use super::file_backed_index::FileBackedIndex;
use super::store_operations::{load_index, METASTORE_FILE_NAME};
use super::{FileBackedIndexCell, FileBackedIndexWriter};

/// Lazy [`FileBackedIndex`]. It loads a `FileBackedIndex` on demand. When the index is first
/// loaded, it optionally spawns a task to periodically poll the storage and update the index.
pub(crate) struct LazyFileBackedIndex {
index_id: IndexId,
storage: Arc<dyn Storage>,
polling_interval_opt: Option<Duration>,
lazy_index: OnceCell<Arc<Mutex<FileBackedIndex>>>,
lazy_index: OnceCell<FileBackedIndexCell>,
}

impl LazyFileBackedIndex {
Expand All @@ -46,15 +47,15 @@ impl LazyFileBackedIndex {
polling_interval_opt: Option<Duration>,
file_backed_index: Option<FileBackedIndex>,
) -> Self {
let index_mutex_opt = file_backed_index.map(|index| Arc::new(Mutex::new(index)));
let index_mutex_opt = file_backed_index.map(FileBackedIndexCell::new);
// If the polling interval is configured and the index is already loaded,
// spawn immediately the polling task
if let Some(index_mutex) = &index_mutex_opt {
if let Some(polling_interval) = polling_interval_opt {
spawn_index_metadata_polling_task(
storage.clone(),
index_id.clone(),
Arc::downgrade(index_mutex),
Arc::downgrade(&index_mutex.writer),
polling_interval,
);
}
Expand All @@ -69,22 +70,23 @@ impl LazyFileBackedIndex {

/// Gets a synchronized `FileBackedIndex`. If the index wasn't provided on creation, we load it
/// lazily on the first call of this method.
pub async fn get(&self) -> MetastoreResult<Arc<Mutex<FileBackedIndex>>> {
pub(crate) async fn get(&self) -> MetastoreResult<FileBackedIndexCell> {
self.lazy_index
.get_or_try_init(|| async move {
let index = load_index(&*self.storage, &self.index_id).await?;
let index_mutex = Arc::new(Mutex::new(index));
let file_backed_index_cell = FileBackedIndexCell::new(index);
let file_backed_index_writer = Arc::downgrade(&file_backed_index_cell.writer);
// When the index is loaded lazily, the polling task is not started in the
// constructor so we do it here when the index is actually loaded.
if let Some(polling_interval) = self.polling_interval_opt {
spawn_index_metadata_polling_task(
self.storage.clone(),
self.index_id.clone(),
Arc::downgrade(&index_mutex),
file_backed_index_writer,
polling_interval,
);
}
Ok(index_mutex)
Ok(file_backed_index_cell)
})
.await
.cloned()
Expand All @@ -94,17 +96,22 @@ impl LazyFileBackedIndex {
async fn poll_index_metadata_once(
storage: &dyn Storage,
index_id: &str,
index_mutex: &Mutex<FileBackedIndex>,
index_writer: &Mutex<FileBackedIndexWriter>,
) {
let mut locked_index = index_mutex.lock().await;
if locked_index.flip_recently_modified_down() {
let mut locked_index = index_writer.lock().await;
if locked_index.upload_task.is_none() {
return;
}
// TODO COol down period.
if locked_index.last_push.elapsed() < Duration::from_secs(30) {
return;
}
let load_index_result = load_index(storage, index_id).await;

match load_index_result {
Ok(index) => {
*locked_index = index;
locked_index.write_state = index;
locked_index.publish();
}
Err(MetastoreError::NotFound(EntityKind::Index { .. })) => {
// The index has been deleted by the file-backed metastore holding a reference to this
Expand All @@ -125,17 +132,17 @@ async fn poll_index_metadata_once(
fn spawn_index_metadata_polling_task(
storage: Arc<dyn Storage>,
index_id: IndexId,
metastore_weak: Weak<Mutex<FileBackedIndex>>,
metastore_weak: Weak<Mutex<FileBackedIndexWriter>>,
polling_interval: Duration,
) {
tokio::task::spawn(async move {
let mut interval = tokio::time::interval(polling_interval);
interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
interval.tick().await; //< this is to prevent fetch right after the first population of the data.

while let Some(metadata_mutex) = metastore_weak.upgrade() {
while let Some(metadata_writer) = metastore_weak.upgrade() {
interval.tick().await;
poll_index_metadata_once(&*storage, &index_id, &metadata_mutex).await;
poll_index_metadata_once(&*storage, &index_id, &*metadata_writer).await;
}
});
}
Loading
Loading