From 8b371724018cff081fdf3e917a9d7827829bd62c Mon Sep 17 00:00:00 2001 From: Sergii Mikhtoniuk Date: Mon, 18 Mar 2024 16:30:55 -0700 Subject: [PATCH 1/2] Add LFS metadata caching option for S3 dataset repo --- .../src/repos/dataset_repository_local_fs.rs | 40 ++-- .../core/src/repos/dataset_repository_s3.rs | 112 +++++++++-- src/infra/core/src/repos/mod.rs | 2 + .../object_repository_caching_local_fs.rs | 174 ++++++++++++++++++ 4 files changed, 303 insertions(+), 25 deletions(-) create mode 100644 src/infra/core/src/repos/object_repository_caching_local_fs.rs diff --git a/src/infra/core/src/repos/dataset_repository_local_fs.rs b/src/infra/core/src/repos/dataset_repository_local_fs.rs index 05f5038b0b..de08bebd65 100644 --- a/src/infra/core/src/repos/dataset_repository_local_fs.rs +++ b/src/infra/core/src/repos/dataset_repository_local_fs.rs @@ -82,11 +82,25 @@ impl DatasetRepositoryLocalFs { )) } + fn build_dataset(layout: DatasetLayout, event_bus: Arc) -> Arc { + Arc::new(DatasetImpl::new( + event_bus, + MetadataChainImpl::new( + MetadataBlockRepositoryCachingInMem::new(MetadataBlockRepositoryImpl::new( + ObjectRepositoryLocalFSSha3::new(layout.blocks_dir), + )), + ReferenceRepositoryImpl::new(NamedObjectRepositoryLocalFS::new(layout.refs_dir)), + ), + ObjectRepositoryLocalFSSha3::new(layout.data_dir), + ObjectRepositoryLocalFSSha3::new(layout.checkpoints_dir), + NamedObjectRepositoryLocalFS::new(layout.info_dir), + )) + } + // TODO: Make dataset factory (and thus the hashing algo) configurable - fn get_dataset_impl(&self, dataset_handle: &DatasetHandle) -> impl Dataset { + fn get_dataset_impl(&self, dataset_handle: &DatasetHandle) -> Arc { let layout = DatasetLayout::new(self.storage_strategy.get_dataset_path(dataset_handle)); - - DatasetFactoryImpl::get_local_fs(layout, self.event_bus.clone()) + Self::build_dataset(layout, self.event_bus.clone()) } // TODO: Used only for testing, but should be removed it in future to discourage @@ -173,7 +187,7 @@ impl DatasetRepository for DatasetRepositoryLocalFs { ) -> Result, GetDatasetError> { let dataset_handle = self.resolve_dataset_ref(dataset_ref).await?; let dataset = self.get_dataset_impl(&dataset_handle); - Ok(Arc::new(dataset)) + Ok(dataset) } async fn create_dataset( @@ -226,14 +240,11 @@ impl DatasetRepository for DatasetRepositoryLocalFs { } // It's okay to create a new dataset by this point - let dataset_id = seed_block.event.dataset_id.clone(); let dataset_handle = DatasetHandle::new(dataset_id, dataset_alias.clone()); let dataset_path = self.storage_strategy.get_dataset_path(&dataset_handle); - let layout = DatasetLayout::create(&dataset_path).int_err()?; - - let dataset = DatasetFactoryImpl::get_local_fs(layout, self.event_bus.clone()); + let dataset = Self::build_dataset(layout, self.event_bus.clone()); // There are three possibilities at this point: // - Dataset did not exist before - continue normally @@ -259,7 +270,7 @@ impl DatasetRepository for DatasetRepositoryLocalFs { }; self.storage_strategy - .handle_dataset_created(&dataset, &dataset_handle.alias) + .handle_dataset_created(dataset.as_ref(), &dataset_handle.alias) .await?; tracing::info!( @@ -277,7 +288,7 @@ impl DatasetRepository for DatasetRepositoryLocalFs { Ok(CreateDatasetResult { dataset_handle, - dataset: Arc::new(dataset), + dataset, head, }) } @@ -467,7 +478,7 @@ impl DatasetSingleTenantStorageStrategy { dataset_alias: &DatasetAlias, ) -> Result { let layout = DatasetLayout::new(dataset_path); - let dataset = DatasetFactoryImpl::get_local_fs(layout, self.event_bus.clone()); + let dataset = DatasetRepositoryLocalFs::build_dataset(layout, self.event_bus.clone()); dataset .get_summary(GetSummaryOpts::default()) .await @@ -643,7 +654,7 @@ impl DatasetMultiTenantStorageStrategy { dataset_id: &DatasetID, ) -> Result { let layout = DatasetLayout::new(dataset_path); - let dataset = DatasetFactoryImpl::get_local_fs(layout, self.event_bus.clone()); + let dataset = DatasetRepositoryLocalFs::build_dataset(layout, self.event_bus.clone()); match dataset.as_info_repo().get("alias").await { Ok(bytes) => { let dataset_alias_str = std::str::from_utf8(&bytes[..]).int_err()?.trim(); @@ -871,11 +882,12 @@ impl DatasetStorageStrategy for DatasetMultiTenantStorageStrategy { ) -> Result<(), InternalError> { let dataset_path = self.get_dataset_path(dataset_handle); let layout = DatasetLayout::new(dataset_path); - let dataset = DatasetFactoryImpl::get_local_fs(layout, self.event_bus.clone()); + let dataset = DatasetRepositoryLocalFs::build_dataset(layout, self.event_bus.clone()); let new_alias = DatasetAlias::new(dataset_handle.alias.account_name.clone(), new_name.clone()); - self.save_dataset_alias(&dataset, &new_alias).await?; + self.save_dataset_alias(dataset.as_ref(), &new_alias) + .await?; Ok(()) } diff --git a/src/infra/core/src/repos/dataset_repository_s3.rs b/src/infra/core/src/repos/dataset_repository_s3.rs index 45f73c6b82..b98e5fb310 100644 --- a/src/infra/core/src/repos/dataset_repository_s3.rs +++ b/src/infra/core/src/repos/dataset_repository_s3.rs @@ -7,6 +7,7 @@ // the Business Source License, use of this software will be governed // by the Apache License, Version 2.0. +use std::path::PathBuf; use std::sync::Arc; use async_trait::async_trait; @@ -17,13 +18,11 @@ use kamu_core::*; use opendatafabric::*; use url::Url; -use super::DatasetFactoryImpl; -use crate::create_dataset_from_snapshot_impl; use crate::utils::s3_context::S3Context; +use crate::*; ///////////////////////////////////////////////////////////////////////////////////////// -#[component(pub)] pub struct DatasetRepositoryS3 { s3_context: S3Context, current_account_subject: Arc, @@ -31,10 +30,12 @@ pub struct DatasetRepositoryS3 { dependency_graph_service: Arc, event_bus: Arc, multi_tenant: bool, + metadata_cache_local_fs_path: Option>, } ///////////////////////////////////////////////////////////////////////////////////////// +#[component(pub)] impl DatasetRepositoryS3 { pub fn new( s3_context: S3Context, @@ -43,6 +44,7 @@ impl DatasetRepositoryS3 { dependency_graph_service: Arc, event_bus: Arc, multi_tenant: bool, + metadata_cache_local_fs_path: Option>, ) -> Self { Self { s3_context, @@ -51,15 +53,102 @@ impl DatasetRepositoryS3 { dependency_graph_service, event_bus, multi_tenant, + metadata_cache_local_fs_path, } } - fn get_dataset_impl(&self, dataset_id: &DatasetID) -> Result { + fn get_dataset_impl(&self, dataset_id: &DatasetID) -> Result, InternalError> { let s3_context = self .s3_context .sub_context(&format!("{}/", &dataset_id.as_multibase())); - DatasetFactoryImpl::get_s3_from_context(s3_context, self.event_bus.clone()) + let client = s3_context.client; + let endpoint = s3_context.endpoint; + let bucket = s3_context.bucket; + let key_prefix = s3_context.key_prefix; + + // TODO: Consider switching DatasetImpl to dynamic dispatch to simplify + // configurability + if let Some(metadata_cache_local_fs_path) = &self.metadata_cache_local_fs_path { + Ok(Arc::new(DatasetImpl::new( + self.event_bus.clone(), + MetadataChainImpl::new( + MetadataBlockRepositoryCachingInMem::new(MetadataBlockRepositoryImpl::new( + ObjectRepositoryCachingLocalFs::new( + ObjectRepositoryS3Sha3::new(S3Context::new( + client.clone(), + endpoint.clone(), + bucket.clone(), + format!("{key_prefix}blocks/"), + )), + metadata_cache_local_fs_path.clone(), + ), + )), + ReferenceRepositoryImpl::new(NamedObjectRepositoryS3::new(S3Context::new( + client.clone(), + endpoint.clone(), + bucket.clone(), + format!("{key_prefix}refs/"), + ))), + ), + ObjectRepositoryS3Sha3::new(S3Context::new( + client.clone(), + endpoint.clone(), + bucket.clone(), + format!("{key_prefix}data/"), + )), + ObjectRepositoryS3Sha3::new(S3Context::new( + client.clone(), + endpoint.clone(), + bucket.clone(), + format!("{key_prefix}checkpoints/"), + )), + NamedObjectRepositoryS3::new(S3Context::new( + client.clone(), + endpoint.clone(), + bucket.clone(), + format!("{key_prefix}info/"), + )), + ))) + } else { + Ok(Arc::new(DatasetImpl::new( + self.event_bus.clone(), + MetadataChainImpl::new( + MetadataBlockRepositoryCachingInMem::new(MetadataBlockRepositoryImpl::new( + ObjectRepositoryS3Sha3::new(S3Context::new( + client.clone(), + endpoint.clone(), + bucket.clone(), + format!("{key_prefix}blocks/"), + )), + )), + ReferenceRepositoryImpl::new(NamedObjectRepositoryS3::new(S3Context::new( + client.clone(), + endpoint.clone(), + bucket.clone(), + format!("{key_prefix}refs/"), + ))), + ), + ObjectRepositoryS3Sha3::new(S3Context::new( + client.clone(), + endpoint.clone(), + bucket.clone(), + format!("{key_prefix}data/"), + )), + ObjectRepositoryS3Sha3::new(S3Context::new( + client.clone(), + endpoint.clone(), + bucket.clone(), + format!("{key_prefix}checkpoints/"), + )), + NamedObjectRepositoryS3::new(S3Context::new( + client.clone(), + endpoint.clone(), + bucket.clone(), + format!("{key_prefix}info/"), + )), + ))) + } } async fn delete_dataset_s3_objects(&self, dataset_id: &DatasetID) -> Result<(), InternalError> { @@ -107,7 +196,7 @@ impl DatasetRepositoryS3 { if let Ok(id) = DatasetID::from_multibase_string(&prefix) { let dataset = self.get_dataset_impl(&id)?; - let dataset_alias = self.resolve_dataset_alias(&dataset).await?; + let dataset_alias = self.resolve_dataset_alias(dataset.as_ref()).await?; if alias_filter(&dataset_alias) { let hdl = DatasetHandle::new(id, dataset_alias); yield hdl; @@ -181,7 +270,7 @@ impl DatasetRepository for DatasetRepositoryS3 { .await? { let dataset = self.get_dataset_impl(id)?; - let dataset_alias = self.resolve_dataset_alias(&dataset).await?; + let dataset_alias = self.resolve_dataset_alias(dataset.as_ref()).await?; Ok(DatasetHandle::new(id.clone(), dataset_alias)) } else { Err(GetDatasetError::NotFound(DatasetNotFoundError { @@ -217,7 +306,7 @@ impl DatasetRepository for DatasetRepositoryS3 { ) -> Result, GetDatasetError> { let dataset_handle = self.resolve_dataset_ref(dataset_ref).await?; let dataset = self.get_dataset_impl(&dataset_handle.id)?; - Ok(Arc::new(dataset)) + Ok(dataset) } async fn create_dataset( @@ -298,7 +387,8 @@ impl DatasetRepository for DatasetRepositoryS3 { }; let normalized_alias = self.normalize_alias(dataset_alias); - self.save_dataset_alias(&dataset, normalized_alias).await?; + self.save_dataset_alias(dataset.as_ref(), normalized_alias) + .await?; let dataset_handle = DatasetHandle::new(dataset_id, dataset_alias.clone()); @@ -317,7 +407,7 @@ impl DatasetRepository for DatasetRepositoryS3 { Ok(CreateDatasetResult { dataset_handle, - dataset: Arc::new(dataset), + dataset, head, }) } @@ -358,7 +448,7 @@ impl DatasetRepository for DatasetRepositoryS3 { .await?; // It's safe to rename dataset - self.save_dataset_alias(&dataset, new_alias).await?; + self.save_dataset_alias(dataset.as_ref(), new_alias).await?; Ok(()) } diff --git a/src/infra/core/src/repos/mod.rs b/src/infra/core/src/repos/mod.rs index 5fad1adb67..45de1814e9 100644 --- a/src/infra/core/src/repos/mod.rs +++ b/src/infra/core/src/repos/mod.rs @@ -22,6 +22,7 @@ mod named_object_repository_in_memory; mod named_object_repository_ipfs_http; mod named_object_repository_local_fs; mod named_object_repository_s3; +mod object_repository_caching_local_fs; mod object_repository_http; mod object_repository_in_memory; mod object_repository_local_fs; @@ -46,6 +47,7 @@ pub use named_object_repository_in_memory::*; pub use named_object_repository_ipfs_http::*; pub use named_object_repository_local_fs::*; pub use named_object_repository_s3::*; +pub use object_repository_caching_local_fs::*; pub use object_repository_http::*; pub use object_repository_in_memory::*; pub use object_repository_local_fs::*; diff --git a/src/infra/core/src/repos/object_repository_caching_local_fs.rs b/src/infra/core/src/repos/object_repository_caching_local_fs.rs new file mode 100644 index 0000000000..f0af94cdab --- /dev/null +++ b/src/infra/core/src/repos/object_repository_caching_local_fs.rs @@ -0,0 +1,174 @@ +// Copyright Kamu Data, Inc. and contributors. All rights reserved. +// +// Use of this software is governed by the Business Source License +// included in the LICENSE file. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0. + +use std::io::ErrorKind; +use std::path::{Path, PathBuf}; +use std::sync::Arc; + +use async_trait::async_trait; +use bytes::Bytes; +use kamu_core::*; +use opendatafabric::Multihash; +use tokio::io::{AsyncSeekExt, AsyncWriteExt}; +use url::Url; + +use crate::utils::s3_context::AsyncReadObj; + +///////////////////////////////////////////////////////////////////////////////////////// + +/// A read-through and (partially) a write-through caching layer for +/// [ObjectRepository] using a local file system. +/// +/// Currently caches objects forever, so a cache directory cleanup has to be +/// handled separately. +pub struct ObjectRepositoryCachingLocalFs { + wrapped: WrappedRepo, + cache_dir: Arc, +} + +///////////////////////////////////////////////////////////////////////////////////////// + +impl ObjectRepositoryCachingLocalFs { + pub fn new(wrapped: WrappedRepo, cache_dir: Arc) -> Self { + Self { wrapped, cache_dir } + } + + fn cache_path(&self, hash: &Multihash) -> PathBuf { + self.cache_dir.join(hash.as_multibase().to_stack_string()) + } + + fn ensure_cache_dir(&self) -> Result<(), std::io::Error> { + if !self.cache_dir.exists() { + std::fs::create_dir_all(self.cache_dir.as_ref())?; + } + Ok(()) + } +} + +///////////////////////////////////////////////////////////////////////////////////////// + +#[async_trait] +impl ObjectRepository for ObjectRepositoryCachingLocalFs +where + WrappedRepo: ObjectRepository, +{ + fn protocol(&self) -> ObjectRepositoryProtocol { + self.wrapped.protocol() + } + + async fn contains(&self, hash: &Multihash) -> Result { + let cache_path = self.cache_path(hash); + if cache_path.is_file() { + Ok(true) + } else { + self.wrapped.contains(hash).await + } + } + + async fn get_size(&self, hash: &Multihash) -> Result { + let cache_path = self.cache_path(hash); + match std::fs::metadata(&cache_path) { + Ok(meta) => Ok(meta.len()), + Err(err) if err.kind() == ErrorKind::NotFound => self.wrapped.get_size(hash).await, + Err(err) => Err(err.int_err().into()), + } + } + + async fn get_bytes(&self, hash: &Multihash) -> Result { + let cache_path = self.cache_path(hash); + match std::fs::read(&cache_path) { + Ok(data) => Ok(Bytes::from(data)), + Err(err) if err.kind() == ErrorKind::NotFound => { + let bytes = self.wrapped.get_bytes(hash).await?; + self.ensure_cache_dir().int_err()?; + std::fs::write(&cache_path, &bytes).int_err()?; + Ok(bytes) + } + Err(err) => Err(err.int_err().into()), + } + } + + async fn get_stream(&self, hash: &Multihash) -> Result, GetError> { + let cache_path = self.cache_path(hash); + + match tokio::fs::File::open(&cache_path).await { + Ok(file) => Ok(Box::new(file)), + Err(err) if err.kind() == ErrorKind::NotFound => { + let mut stream = self.wrapped.get_stream(hash).await?; + + let mut file = tokio::fs::File::create(cache_path).await.int_err()?; + tokio::io::copy(&mut stream, &mut file).await.int_err()?; + file.flush().await.int_err()?; + + file.seek(std::io::SeekFrom::Start(0)).await.int_err()?; + Ok(Box::new(file)) + } + Err(err) => Err(err.int_err().into()), + } + } + + async fn get_internal_url(&self, hash: &Multihash) -> Url { + self.wrapped.get_internal_url(hash).await + } + + async fn get_external_download_url( + &self, + hash: &Multihash, + opts: ExternalTransferOpts, + ) -> Result { + self.wrapped.get_external_download_url(hash, opts).await + } + + async fn get_external_upload_url( + &self, + hash: &Multihash, + opts: ExternalTransferOpts, + ) -> Result { + self.wrapped.get_external_upload_url(hash, opts).await + } + + async fn insert_bytes<'a>( + &'a self, + data: &'a [u8], + options: InsertOpts<'a>, + ) -> Result { + let res = self.wrapped.insert_bytes(data, options).await?; + let cache_path = self.cache_path(&res.hash); + std::fs::write(&cache_path, data).int_err()?; + Ok(res) + } + + async fn insert_stream<'a>( + &'a self, + src: Box, + options: InsertOpts<'a>, + ) -> Result { + // Will cache upon next read, for simplicity + self.wrapped.insert_stream(src, options).await + } + + async fn insert_file_move<'a>( + &'a self, + src: &Path, + options: InsertOpts<'a>, + ) -> Result { + // Will cache upon next read, for simplicity + self.wrapped.insert_file_move(src, options).await + } + + async fn delete(&self, hash: &Multihash) -> Result<(), DeleteError> { + let cache_path = self.cache_path(hash); + match std::fs::remove_file(&cache_path) { + Ok(()) => Ok(()), + Err(err) if err.kind() == ErrorKind::NotFound => Ok(()), + Err(err) => Err(err.int_err()), + }?; + self.wrapped.delete(hash).await + } +} From ed26a2aeb6811979928b3dd6ebd4b325d9ef6328 Mon Sep 17 00:00:00 2001 From: Sergii Mikhtoniuk Date: Mon, 18 Mar 2024 18:30:34 -0700 Subject: [PATCH 2/2] Add in-mem cache of dataset IDs and aliases for S3 repo --- CHANGELOG.md | 5 + .../core/src/repos/dataset_repository_s3.rs | 153 ++++++++++++++---- .../object_repository_caching_local_fs.rs | 4 +- 3 files changed, 125 insertions(+), 37 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index bdb3ce069c..c1109e120a 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,6 +4,11 @@ All notable changes to this project will be documented in this file. The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). +## Unreleased +### Added +- Implementation of `ObjectRepository` that can cache small objects on local file system (e.g. to avoid too many calls to S3 repo) +- Optional `S3RegistryCache` component that can cache the list of datasets under an S3 repo to avoid very expensive bucket prefix listing calls + ## [0.166.1] - 2024-03-14 ### Fixed - Allow OData adapter to skip fields with unsupported data types instead of chasing diff --git a/src/infra/core/src/repos/dataset_repository_s3.rs b/src/infra/core/src/repos/dataset_repository_s3.rs index b98e5fb310..e93efa5d9e 100644 --- a/src/infra/core/src/repos/dataset_repository_s3.rs +++ b/src/infra/core/src/repos/dataset_repository_s3.rs @@ -11,11 +11,13 @@ use std::path::PathBuf; use std::sync::Arc; use async_trait::async_trait; +use chrono::{DateTime, Utc}; use dill::*; use event_bus::EventBus; use kamu_core::auth::{DatasetAction, DatasetActionAuthorizer, DEFAULT_ACCOUNT_NAME}; use kamu_core::*; use opendatafabric::*; +use tokio::sync::Mutex; use url::Url; use crate::utils::s3_context::S3Context; @@ -30,6 +32,7 @@ pub struct DatasetRepositoryS3 { dependency_graph_service: Arc, event_bus: Arc, multi_tenant: bool, + registry_cache: Option>, metadata_cache_local_fs_path: Option>, } @@ -37,6 +40,15 @@ pub struct DatasetRepositoryS3 { #[component(pub)] impl DatasetRepositoryS3 { + /// # Arguments + /// + /// * `registry_cache` - when present in the catalog enables in-memory cache + /// of the dataset IDs and aliases present in the repository, allowing to + /// avoid expensive bucket scanning + /// + /// * `metadata_cache_local_fs_path` - when specified enables the local FS + /// cache of metadata blocks, allowing to dramatically reduce the number + /// of requests to S3 pub fn new( s3_context: S3Context, current_account_subject: Arc, @@ -44,6 +56,7 @@ impl DatasetRepositoryS3 { dependency_graph_service: Arc, event_bus: Arc, multi_tenant: bool, + registry_cache: Option>, metadata_cache_local_fs_path: Option>, ) -> Self { Self { @@ -53,11 +66,12 @@ impl DatasetRepositoryS3 { dependency_graph_service, event_bus, multi_tenant, + registry_cache, metadata_cache_local_fs_path, } } - fn get_dataset_impl(&self, dataset_id: &DatasetID) -> Result, InternalError> { + fn get_dataset_impl(&self, dataset_id: &DatasetID) -> Arc { let s3_context = self .s3_context .sub_context(&format!("{}/", &dataset_id.as_multibase())); @@ -70,7 +84,7 @@ impl DatasetRepositoryS3 { // TODO: Consider switching DatasetImpl to dynamic dispatch to simplify // configurability if let Some(metadata_cache_local_fs_path) = &self.metadata_cache_local_fs_path { - Ok(Arc::new(DatasetImpl::new( + Arc::new(DatasetImpl::new( self.event_bus.clone(), MetadataChainImpl::new( MetadataBlockRepositoryCachingInMem::new(MetadataBlockRepositoryImpl::new( @@ -109,9 +123,9 @@ impl DatasetRepositoryS3 { bucket.clone(), format!("{key_prefix}info/"), )), - ))) + )) } else { - Ok(Arc::new(DatasetImpl::new( + Arc::new(DatasetImpl::new( self.event_bus.clone(), MetadataChainImpl::new( MetadataBlockRepositoryCachingInMem::new(MetadataBlockRepositoryImpl::new( @@ -147,7 +161,7 @@ impl DatasetRepositoryS3 { bucket.clone(), format!("{key_prefix}info/"), )), - ))) + )) } } @@ -171,7 +185,7 @@ impl DatasetRepositoryS3 { async fn save_dataset_alias( &self, dataset: &dyn Dataset, - dataset_alias: DatasetAlias, + dataset_alias: &DatasetAlias, ) -> Result<(), InternalError> { dataset .as_info_repo() @@ -182,26 +196,53 @@ impl DatasetRepositoryS3 { Ok(()) } + #[tracing::instrument(level = "info", skip_all)] + async fn list_datasets_in_s3(&self) -> Result, InternalError> { + let mut res = Vec::new(); + + let folders_common_prefixes = self.s3_context.bucket_list_folders().await?; + + for prefix in folders_common_prefixes { + let mut prefix = prefix.prefix.unwrap(); + while prefix.ends_with('/') { + prefix.pop(); + } + + if let Ok(id) = DatasetID::from_multibase_string(&prefix) { + let dataset = self.get_dataset_impl(&id); + let dataset_alias = self.resolve_dataset_alias(dataset.as_ref()).await?; + res.push(DatasetHandle::new(id, dataset_alias)); + } + } + + Ok(res) + } + + async fn list_datasets_maybe_cached(&self) -> Result, InternalError> { + if let Some(cache) = &self.registry_cache { + let mut cache = cache.state.lock().await; + + // Init cache + if cache.last_updated == DateTime::UNIX_EPOCH { + tracing::debug!("Initializing dataset registry cache"); + cache.datasets = self.list_datasets_in_s3().await?; + cache.last_updated = Utc::now(); + } + + Ok(cache.datasets.clone()) + } else { + self.list_datasets_in_s3().await + } + } + fn stream_datasets_if<'s>( &'s self, alias_filter: impl Fn(&DatasetAlias) -> bool + Send + 's, ) -> DatasetHandleStream<'s> { Box::pin(async_stream::try_stream! { - let folders_common_prefixes = self.s3_context.bucket_list_folders().await?; - for prefix in folders_common_prefixes { - let mut prefix = prefix.prefix.unwrap(); - while prefix.ends_with('/') { - prefix.pop(); - } - - if let Ok(id) = DatasetID::from_multibase_string(&prefix) { - let dataset = self.get_dataset_impl(&id)?; - let dataset_alias = self.resolve_dataset_alias(dataset.as_ref()).await?; - if alias_filter(&dataset_alias) { - let hdl = DatasetHandle::new(id, dataset_alias); - yield hdl; - } - + for hdl in self.list_datasets_maybe_cached().await? { + if alias_filter(&hdl.alias) { + yield hdl; } } }) @@ -269,7 +310,7 @@ impl DatasetRepository for DatasetRepositoryS3 { .bucket_path_exists(id.as_multibase().to_stack_string().as_str()) .await? { - let dataset = self.get_dataset_impl(id)?; + let dataset = self.get_dataset_impl(id); let dataset_alias = self.resolve_dataset_alias(dataset.as_ref()).await?; Ok(DatasetHandle::new(id.clone(), dataset_alias)) } else { @@ -305,7 +346,7 @@ impl DatasetRepository for DatasetRepositoryS3 { dataset_ref: &DatasetRef, ) -> Result, GetDatasetError> { let dataset_handle = self.resolve_dataset_ref(dataset_ref).await?; - let dataset = self.get_dataset_impl(&dataset_handle.id)?; + let dataset = self.get_dataset_impl(&dataset_handle.id); Ok(dataset) } @@ -361,7 +402,7 @@ impl DatasetRepository for DatasetRepositoryS3 { // It's okay to create a new dataset by this point let dataset_id = seed_block.event.dataset_id.clone(); - let dataset = self.get_dataset_impl(&dataset_id)?; + let dataset = self.get_dataset_impl(&dataset_id); // There are three possibilities at this point: // - Dataset did not exist before - continue normally @@ -387,11 +428,17 @@ impl DatasetRepository for DatasetRepositoryS3 { }; let normalized_alias = self.normalize_alias(dataset_alias); - self.save_dataset_alias(dataset.as_ref(), normalized_alias) + self.save_dataset_alias(dataset.as_ref(), &normalized_alias) .await?; let dataset_handle = DatasetHandle::new(dataset_id, dataset_alias.clone()); + // Update cache if enabled + if let Some(cache) = &self.registry_cache { + let mut cache = cache.state.lock().await; + cache.datasets.push(dataset_handle.clone()); + } + self.event_bus .dispatch_event(events::DatasetEventCreated { dataset_id: dataset_handle.id.clone(), @@ -424,31 +471,37 @@ impl DatasetRepository for DatasetRepositoryS3 { dataset_ref: &DatasetRef, new_name: &DatasetName, ) -> Result<(), RenameDatasetError> { - let dataset_handle = self.resolve_dataset_ref(dataset_ref).await?; + let old_handle = self.resolve_dataset_ref(dataset_ref).await?; - let dataset = self.get_dataset_impl(&dataset_handle.id)?; + let dataset = self.get_dataset_impl(&old_handle.id); - let new_alias = - DatasetAlias::new(dataset_handle.alias.account_name.clone(), new_name.clone()); + let new_alias = DatasetAlias::new(old_handle.alias.account_name.clone(), new_name.clone()); // Check against possible name collisions match self.resolve_dataset_ref(&new_alias.as_local_ref()).await { Ok(_) => Err(RenameDatasetError::NameCollision(NameCollisionError { - alias: DatasetAlias::new( - dataset_handle.alias.account_name.clone(), - new_name.clone(), - ), + alias: DatasetAlias::new(old_handle.alias.account_name.clone(), new_name.clone()), })), Err(GetDatasetError::Internal(e)) => Err(RenameDatasetError::Internal(e)), Err(GetDatasetError::NotFound(_)) => Ok(()), }?; self.dataset_action_authorizer - .check_action_allowed(&dataset_handle, DatasetAction::Write) + .check_action_allowed(&old_handle, DatasetAction::Write) .await?; // It's safe to rename dataset - self.save_dataset_alias(dataset.as_ref(), new_alias).await?; + self.save_dataset_alias(dataset.as_ref(), &new_alias) + .await?; + + // Update cache if enabled + if let Some(cache) = &self.registry_cache { + let mut cache = cache.state.lock().await; + cache.datasets.retain(|h| h.id != old_handle.id); + cache + .datasets + .push(DatasetHandle::new(old_handle.id, new_alias)); + } Ok(()) } @@ -494,6 +547,12 @@ impl DatasetRepository for DatasetRepositoryS3 { .await .map_err(DeleteDatasetError::Internal)?; + // Update cache if enabled + if let Some(cache) = &self.registry_cache { + let mut cache = cache.state.lock().await; + cache.datasets.retain(|h| h.id != dataset_handle.id); + } + self.event_bus .dispatch_event(events::DatasetEventDeleted { dataset_id: dataset_handle.id, @@ -505,3 +564,27 @@ impl DatasetRepository for DatasetRepositoryS3 { } ///////////////////////////////////////////////////////////////////////////////////////// + +pub struct S3RegistryCache { + state: Arc>, +} + +struct State { + datasets: Vec, + last_updated: DateTime, +} + +#[component(pub)] +#[dill::scope(Singleton)] +impl S3RegistryCache { + pub fn new() -> Self { + Self { + state: Arc::new(Mutex::new(State { + datasets: Vec::new(), + last_updated: DateTime::UNIX_EPOCH, + })), + } + } +} + +///////////////////////////////////////////////////////////////////////////////////////// diff --git a/src/infra/core/src/repos/object_repository_caching_local_fs.rs b/src/infra/core/src/repos/object_repository_caching_local_fs.rs index f0af94cdab..a159aceea2 100644 --- a/src/infra/core/src/repos/object_repository_caching_local_fs.rs +++ b/src/infra/core/src/repos/object_repository_caching_local_fs.rs @@ -23,7 +23,7 @@ use crate::utils::s3_context::AsyncReadObj; ///////////////////////////////////////////////////////////////////////////////////////// /// A read-through and (partially) a write-through caching layer for -/// [ObjectRepository] using a local file system. +/// [`ObjectRepository`] using a local file system. /// /// Currently caches objects forever, so a cache directory cleanup has to be /// handled separately. @@ -140,7 +140,7 @@ where ) -> Result { let res = self.wrapped.insert_bytes(data, options).await?; let cache_path = self.cache_path(&res.hash); - std::fs::write(&cache_path, data).int_err()?; + std::fs::write(cache_path, data).int_err()?; Ok(res) }