Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(puffin): adjust generic parameters #4279

Merged
merged 3 commits into from
Jul 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

20 changes: 7 additions & 13 deletions src/mito2/src/sst/index/puffin_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use object_store::{FuturesAsyncReader, FuturesAsyncWriter, ObjectStore};
use puffin::error::{self as puffin_error, Result as PuffinResult};
use puffin::puffin_manager::file_accessor::PuffinFileAccessor;
use puffin::puffin_manager::fs_puffin_manager::FsPuffinManager;
use puffin::puffin_manager::stager::{BoundedStager, FsBlobGuard, FsDirGuard};
use puffin::puffin_manager::stager::{BoundedStager, FsBlobGuard};
use puffin::puffin_manager::BlobGuard;
use snafu::ResultExt;

Expand All @@ -36,12 +36,8 @@ type InstrumentedAsyncRead = store::InstrumentedAsyncRead<'static, FuturesAsyncR
type InstrumentedAsyncWrite = store::InstrumentedAsyncWrite<'static, FuturesAsyncWriter>;

pub(crate) type BlobReader = <Arc<FsBlobGuard> as BlobGuard>::Reader;
pub(crate) type SstPuffinManager = FsPuffinManager<
Arc<FsBlobGuard>,
Arc<FsDirGuard>,
InstrumentedAsyncRead,
InstrumentedAsyncWrite,
>;
pub(crate) type SstPuffinManager =
FsPuffinManager<Arc<BoundedStager>, ObjectStorePuffinFileAccessor>;

const STAGING_DIR: &str = "staging";

Expand Down Expand Up @@ -75,12 +71,12 @@ impl PuffinManagerFactory {
pub(crate) fn build(&self, store: ObjectStore) -> SstPuffinManager {
let store = InstrumentedStore::new(store).with_write_buffer_size(self.write_buffer_size);
let puffin_file_accessor = ObjectStorePuffinFileAccessor::new(store);
SstPuffinManager::new(self.stager.clone(), Arc::new(puffin_file_accessor))
SstPuffinManager::new(self.stager.clone(), puffin_file_accessor)
}
}

#[cfg(test)]
impl PuffinManagerFactory {
#[cfg(test)]
pub(crate) async fn new_for_test_async(
prefix: &str,
) -> (common_test_util::temp_dir::TempDir, Self) {
Expand All @@ -91,7 +87,6 @@ impl PuffinManagerFactory {
(tempdir, factory)
}

#[cfg(test)]
pub(crate) fn new_for_test_block(prefix: &str) -> (common_test_util::temp_dir::TempDir, Self) {
let tempdir = common_test_util::temp_dir::create_temp_dir(prefix);

Expand All @@ -103,6 +98,7 @@ impl PuffinManagerFactory {
}

/// A `PuffinFileAccessor` implementation that uses an object store as the underlying storage.
#[derive(Clone)]
pub(crate) struct ObjectStorePuffinFileAccessor {
object_store: InstrumentedStore,
}
Expand Down Expand Up @@ -152,9 +148,7 @@ mod tests {
use futures::AsyncReadExt;
use object_store::services::Memory;
use puffin::blob_metadata::CompressionCodec;
use puffin::puffin_manager::{
BlobGuard, DirGuard, PuffinManager, PuffinReader, PuffinWriter, PutOptions,
};
use puffin::puffin_manager::{DirGuard, PuffinManager, PuffinReader, PuffinWriter, PutOptions};

use super::*;

Expand Down
1 change: 1 addition & 0 deletions src/puffin/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ workspace = true
async-compression = { version = "0.4", features = ["futures-io", "zstd"] }
async-trait.workspace = true
async-walkdir = "2.0.0"
auto_impl = "1.2.0"
base64.workspace = true
bitflags.workspace = true
common-error.workspace = true
Expand Down
3 changes: 3 additions & 0 deletions src/puffin/src/puffin_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ pub struct PutOptions {

/// The `PuffinReader` trait provides methods for reading blobs and directories from a Puffin file.
#[async_trait]
#[auto_impl::auto_impl(Arc)]
pub trait PuffinReader {
type Blob: BlobGuard;
type Dir: DirGuard;
Expand All @@ -91,13 +92,15 @@ pub trait PuffinReader {

/// `BlobGuard` is provided by the `PuffinReader` to access the blob data.
/// Users should hold the `BlobGuard` until they are done with the blob data.
#[auto_impl::auto_impl(Arc)]
pub trait BlobGuard {
type Reader: AsyncRead + AsyncSeek + Unpin;
fn reader(&self) -> BoxFuture<'static, Result<Self::Reader>>;
}

/// `DirGuard` is provided by the `PuffinReader` to access the directory in the filesystem.
/// Users should hold the `DirGuard` until they are done with the directory.
#[auto_impl::auto_impl(Arc)]
pub trait DirGuard {
fn path(&self) -> &PathBuf;
}
12 changes: 4 additions & 8 deletions src/puffin/src/puffin_manager/file_accessor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,25 +12,21 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::sync::Arc;

use async_trait::async_trait;
use futures::{AsyncRead, AsyncSeek, AsyncWrite};

use crate::error::Result;

/// `PuffinFileAccessor` is for opening readers and writers for puffin files.
#[async_trait]
pub trait PuffinFileAccessor {
type Reader: AsyncRead + AsyncSeek;
type Writer: AsyncWrite;
#[auto_impl::auto_impl(Arc)]
pub trait PuffinFileAccessor: Send + Sync + 'static {
type Reader: AsyncRead + AsyncSeek + Unpin + Send;
type Writer: AsyncWrite + Unpin + Send;

/// Opens a reader for the given puffin file.
async fn reader(&self, puffin_file_name: &str) -> Result<Self::Reader>;

/// Creates a writer for the given puffin file.
async fn writer(&self, puffin_file_name: &str) -> Result<Self::Writer>;
}

pub type PuffinFileAccessorRef<R, W> =
Arc<dyn PuffinFileAccessor<Reader = R, Writer = W> + Send + Sync>;
33 changes: 13 additions & 20 deletions src/puffin/src/puffin_manager/fs_puffin_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,30 +17,25 @@ mod reader;
mod writer;

use async_trait::async_trait;
use futures::{AsyncRead, AsyncSeek, AsyncWrite};
pub use reader::FsPuffinReader;
pub use writer::FsPuffinWriter;

use super::file_accessor::PuffinFileAccessor;
use crate::error::Result;
use crate::puffin_manager::file_accessor::PuffinFileAccessorRef;
use crate::puffin_manager::stager::StagerRef;
use crate::puffin_manager::{BlobGuard, DirGuard, PuffinManager};
use crate::puffin_manager::stager::Stager;
use crate::puffin_manager::PuffinManager;

/// `FsPuffinManager` is a `PuffinManager` that provides readers and writers for puffin data in filesystem.
pub struct FsPuffinManager<B, D, AR, AW> {
pub struct FsPuffinManager<S, F> {
/// The stager.
stager: StagerRef<B, D>,

stager: S,
/// The puffin file accessor.
puffin_file_accessor: PuffinFileAccessorRef<AR, AW>,
puffin_file_accessor: F,
}

impl<B, D, AR, AW> FsPuffinManager<B, D, AR, AW> {
impl<S, F> FsPuffinManager<S, F> {
/// Creates a new `FsPuffinManager` with the specified `stager` and `puffin_file_accessor`.
pub fn new(
stager: StagerRef<B, D>,
puffin_file_accessor: PuffinFileAccessorRef<AR, AW>,
) -> Self {
pub fn new(stager: S, puffin_file_accessor: F) -> Self {
Self {
stager,
puffin_file_accessor,
Expand All @@ -49,15 +44,13 @@ impl<B, D, AR, AW> FsPuffinManager<B, D, AR, AW> {
}

#[async_trait]
impl<B, D, AR, AW> PuffinManager for FsPuffinManager<B, D, AR, AW>
impl<S, F> PuffinManager for FsPuffinManager<S, F>
where
B: BlobGuard,
D: DirGuard,
AR: AsyncRead + AsyncSeek + Send + Unpin + 'static,
AW: AsyncWrite + Send + Unpin + 'static,
S: Stager + Clone,
F: PuffinFileAccessor + Clone,
{
type Reader = FsPuffinReader<B, D, AR, AW>;
type Writer = FsPuffinWriter<B, D, AW>;
type Reader = FsPuffinReader<S, F>;
type Writer = FsPuffinWriter<S, F::Writer>;

async fn reader(&self, puffin_file_name: &str) -> Result<Self::Reader> {
Ok(FsPuffinReader::new(
Expand Down
46 changes: 19 additions & 27 deletions src/puffin/src/puffin_manager/fs_puffin_manager/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use async_compression::futures::bufread::ZstdDecoder;
use async_trait::async_trait;
use futures::future::BoxFuture;
use futures::io::BufReader;
use futures::{AsyncRead, AsyncReadExt, AsyncSeek, AsyncWrite};
use futures::{AsyncRead, AsyncReadExt, AsyncWrite};
use snafu::{ensure, OptionExt, ResultExt};

use crate::blob_metadata::CompressionCodec;
Expand All @@ -25,29 +25,25 @@ use crate::error::{
ReadSnafu, Result, UnsupportedDecompressionSnafu, WriteSnafu,
};
use crate::file_format::reader::{AsyncReader, PuffinFileReader};
use crate::puffin_manager::file_accessor::PuffinFileAccessorRef;
use crate::puffin_manager::file_accessor::PuffinFileAccessor;
use crate::puffin_manager::fs_puffin_manager::dir_meta::DirMetadata;
use crate::puffin_manager::stager::{BoxWriter, DirWriterProviderRef, StagerRef};
use crate::puffin_manager::{BlobGuard, DirGuard, PuffinReader};
use crate::puffin_manager::stager::{BoxWriter, DirWriterProviderRef, Stager};
use crate::puffin_manager::PuffinReader;

/// `FsPuffinReader` is a `PuffinReader` that provides fs readers for puffin files.
pub struct FsPuffinReader<B, G, AR, AW> {
pub struct FsPuffinReader<S, F> {
/// The name of the puffin file.
puffin_file_name: String,

/// The stager.
stager: StagerRef<B, G>,
stager: S,

/// The puffin file accessor.
puffin_file_accessor: PuffinFileAccessorRef<AR, AW>,
puffin_file_accessor: F,
}

impl<B, D, AR, AW> FsPuffinReader<B, D, AR, AW> {
pub(crate) fn new(
puffin_file_name: String,
stager: StagerRef<B, D>,
puffin_file_accessor: PuffinFileAccessorRef<AR, AW>,
) -> Self {
impl<S, F> FsPuffinReader<S, F> {
pub(crate) fn new(puffin_file_name: String, stager: S, puffin_file_accessor: F) -> Self {
Self {
puffin_file_name,
stager,
Expand All @@ -57,15 +53,13 @@ impl<B, D, AR, AW> FsPuffinReader<B, D, AR, AW> {
}

#[async_trait]
impl<B, D, AR, AW> PuffinReader for FsPuffinReader<B, D, AR, AW>
impl<S, F> PuffinReader for FsPuffinReader<S, F>
where
B: BlobGuard,
D: DirGuard,
AR: AsyncRead + AsyncSeek + Send + Unpin + 'static,
AW: AsyncWrite + 'static,
S: Stager,
F: PuffinFileAccessor + Clone,
{
type Blob = B;
type Dir = D;
type Blob = S::Blob;
type Dir = S::Dir;

async fn blob(&self, key: &str) -> Result<Self::Blob> {
self.stager
Expand Down Expand Up @@ -98,18 +92,16 @@ where
}
}

impl<B, G, AR, AW> FsPuffinReader<B, G, AR, AW>
impl<S, F> FsPuffinReader<S, F>
where
B: BlobGuard,
G: DirGuard,
AR: AsyncRead + AsyncSeek + Send + Unpin + 'static,
AW: AsyncWrite + 'static,
S: Stager,
F: PuffinFileAccessor,
{
fn init_blob_to_cache(
puffin_file_name: String,
key: String,
mut writer: BoxWriter,
accessor: PuffinFileAccessorRef<AR, AW>,
accessor: F,
) -> BoxFuture<'static, Result<u64>> {
Box::pin(async move {
let reader = accessor.reader(&puffin_file_name).await?;
Expand All @@ -134,7 +126,7 @@ where
puffin_file_name: String,
key: String,
writer_provider: DirWriterProviderRef,
accessor: PuffinFileAccessorRef<AR, AW>,
accessor: F,
) -> BoxFuture<'static, Result<u64>> {
Box::pin(async move {
let reader = accessor.reader(&puffin_file_name).await?;
Expand Down
22 changes: 10 additions & 12 deletions src/puffin/src/puffin_manager/fs_puffin_manager/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,16 +30,16 @@ use crate::error::{
};
use crate::file_format::writer::{AsyncWriter, Blob, PuffinFileWriter};
use crate::puffin_manager::fs_puffin_manager::dir_meta::{DirFileMetadata, DirMetadata};
use crate::puffin_manager::stager::StagerRef;
use crate::puffin_manager::{BlobGuard, DirGuard, PuffinWriter, PutOptions};
use crate::puffin_manager::stager::Stager;
use crate::puffin_manager::{PuffinWriter, PutOptions};

/// `FsPuffinWriter` is a `PuffinWriter` that writes blobs and directories to a puffin file.
pub struct FsPuffinWriter<B, D, W> {
pub struct FsPuffinWriter<S, W> {
/// The name of the puffin file.
puffin_file_name: String,

/// The stager.
stager: StagerRef<B, D>,
stager: S,

/// The underlying `PuffinFileWriter`.
puffin_file_writer: PuffinFileWriter<W>,
Expand All @@ -48,8 +48,8 @@ pub struct FsPuffinWriter<B, D, W> {
blob_keys: HashSet<String>,
}

impl<B, D, W> FsPuffinWriter<B, D, W> {
pub(crate) fn new(puffin_file_name: String, stager: StagerRef<B, D>, writer: W) -> Self {
impl<S, W> FsPuffinWriter<S, W> {
pub(crate) fn new(puffin_file_name: String, stager: S, writer: W) -> Self {
Self {
puffin_file_name,
stager,
Expand All @@ -60,10 +60,9 @@ impl<B, D, W> FsPuffinWriter<B, D, W> {
}

#[async_trait]
impl<B, D, W> PuffinWriter for FsPuffinWriter<B, D, W>
impl<S, W> PuffinWriter for FsPuffinWriter<S, W>
where
B: BlobGuard,
D: DirGuard,
S: Stager,
W: AsyncWrite + Unpin + Send,
{
async fn put_blob<R>(&mut self, key: &str, raw_data: R, options: PutOptions) -> Result<u64>
Expand Down Expand Up @@ -164,10 +163,9 @@ where
}
}

impl<B, G, W> FsPuffinWriter<B, G, W>
impl<S, W> FsPuffinWriter<S, W>
where
B: BlobGuard,
G: DirGuard,
S: Stager,
W: AsyncWrite + Unpin + Send,
{
/// Compresses the raw data and writes it to the puffin file.
Expand Down
6 changes: 2 additions & 4 deletions src/puffin/src/puffin_manager/stager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
mod bounded_stager;

use std::path::PathBuf;
use std::sync::Arc;

use async_trait::async_trait;
pub use bounded_stager::{BoundedStager, FsBlobGuard, FsDirGuard};
Expand Down Expand Up @@ -53,7 +52,8 @@ pub trait InitDirFn = Fn(DirWriterProviderRef) -> WriteResult;

/// `Stager` manages the staging area for the puffin files.
#[async_trait]
pub trait Stager {
#[auto_impl::auto_impl(Arc)]
pub trait Stager: Send + Sync {
type Blob: BlobGuard;
type Dir: DirGuard;

Expand Down Expand Up @@ -88,5 +88,3 @@ pub trait Stager {
dir_size: u64,
) -> Result<()>;
}

pub type StagerRef<B, D> = Arc<dyn Stager<Blob = B, Dir = D> + Send + Sync>;
4 changes: 2 additions & 2 deletions src/puffin/src/puffin_manager/stager/bounded_stager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -425,7 +425,7 @@ pub struct FsBlobGuard {
delete_queue: Sender<DeleteTask>,
}

impl BlobGuard for Arc<FsBlobGuard> {
impl BlobGuard for FsBlobGuard {
type Reader = Compat<fs::File>;

fn reader(&self) -> BoxFuture<'static, Result<Self::Reader>> {
Expand Down Expand Up @@ -460,7 +460,7 @@ pub struct FsDirGuard {
delete_queue: Sender<DeleteTask>,
}

impl DirGuard for Arc<FsDirGuard> {
impl DirGuard for FsDirGuard {
fn path(&self) -> &PathBuf {
&self.path
}
Expand Down
Loading