Skip to content

Commit

Permalink
feat(puffin): not to stage blob
Browse files Browse the repository at this point in the history
Signed-off-by: Zhenchi <[email protected]>
  • Loading branch information
zhongzc committed Jul 10, 2024
1 parent 52a9a74 commit 9bb3a40
Show file tree
Hide file tree
Showing 9 changed files with 127 additions and 131 deletions.
4 changes: 0 additions & 4 deletions src/mito2/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -371,9 +371,6 @@ pub struct InvertedIndexConfig {
/// Memory threshold for performing an external sort during index creation.
pub mem_threshold_on_create: MemoryThreshold,

/// Whether to compress the index data.
pub compress: bool,

#[deprecated = "use [IndexConfig::aux_path] instead"]
#[serde(skip_serializing)]
pub intermediate_path: String,
Expand All @@ -396,7 +393,6 @@ impl Default for InvertedIndexConfig {
create_on_compaction: Mode::Auto,
apply_on_query: Mode::Auto,
mem_threshold_on_create: MemoryThreshold::Auto,
compress: true,
write_buffer_size: ReadableSize::mb(8),
intermediate_path: String::new(),
metadata_cache_size: ReadableSize::mb(32),
Expand Down
2 changes: 1 addition & 1 deletion src/mito2/src/engine/basic_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -580,7 +580,7 @@ async fn test_region_usage() {
flush_region(&engine, region_id, None).await;

let region_stat = region.region_usage();
assert_eq!(region_stat.sst_usage, 3026);
assert_eq!(region_stat.sst_usage, 3010);

// region total usage
// Some memtables may share items.
Expand Down
1 change: 0 additions & 1 deletion src/mito2/src/sst/index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,6 @@ impl<'a> IndexerBuilder<'a> {
self.intermediate_manager.clone(),
self.inverted_index_config.mem_threshold_on_create(),
segment_row_count,
self.inverted_index_config.compress,
&self.index_options.inverted_index.ignore_column_ids,
);

Expand Down
12 changes: 1 addition & 11 deletions src/mito2/src/sst/index/inverted_index/creator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ use index::inverted_index::create::sort::external_sort::ExternalSorter;
use index::inverted_index::create::sort_create::SortIndexCreator;
use index::inverted_index::create::InvertedIndexCreator;
use index::inverted_index::format::writer::InvertedIndexBlobWriter;
use puffin::blob_metadata::CompressionCodec;
use puffin::puffin_manager::{PuffinWriter, PutOptions};
use snafu::{ensure, ResultExt};
use store_api::metadata::RegionMetadataRef;
Expand Down Expand Up @@ -71,9 +70,6 @@ pub struct SstIndexCreator {
/// The memory usage of the index creator.
memory_usage: Arc<AtomicUsize>,

/// Whether to compress the index data.
compress: bool,

/// Ids of indexed columns.
column_ids: HashSet<ColumnId>,
}
Expand All @@ -87,7 +83,6 @@ impl SstIndexCreator {
intermediate_manager: IntermediateManager,
memory_usage_threshold: Option<usize>,
segment_row_count: NonZeroUsize,
compress: bool,
ignore_column_ids: &[ColumnId],
) -> Self {
let temp_file_provider = Arc::new(TempFileProvider::new(
Expand Down Expand Up @@ -122,7 +117,6 @@ impl SstIndexCreator {
stats: Statistics::default(),
aborted: false,
memory_usage,
compress,
column_ids,
}
}
Expand Down Expand Up @@ -242,12 +236,9 @@ impl SstIndexCreator {
let (tx, rx) = duplex(PIPE_BUFFER_SIZE_FOR_SENDING_BLOB);
let mut index_writer = InvertedIndexBlobWriter::new(tx.compat_write());

let put_options = PutOptions {
compression: self.compress.then_some(CompressionCodec::Zstd),
};
let (index_finish, puffin_add_blob) = futures::join!(
self.index_creator.finish(&mut index_writer),
puffin_writer.put_blob(INDEX_BLOB_TYPE, rx.compat(), put_options)
puffin_writer.put_blob(INDEX_BLOB_TYPE, rx.compat(), PutOptions::default())
);

match (
Expand Down Expand Up @@ -398,7 +389,6 @@ mod tests {
intm_mgr,
memory_threshold,
NonZeroUsize::new(segment_row_count).unwrap(),
false,
&[],
);

Expand Down
9 changes: 5 additions & 4 deletions src/mito2/src/sst/index/puffin_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@ use object_store::{FuturesAsyncReader, FuturesAsyncWriter, ObjectStore};
use puffin::error::{self as puffin_error, Result as PuffinResult};
use puffin::puffin_manager::file_accessor::PuffinFileAccessor;
use puffin::puffin_manager::fs_puffin_manager::FsPuffinManager;
use puffin::puffin_manager::stager::{BoundedStager, FsBlobGuard};
use puffin::puffin_manager::{BlobGuard, PuffinManager};
use puffin::puffin_manager::stager::BoundedStager;
use puffin::puffin_manager::{BlobGuard, PuffinManager, PuffinReader};
use snafu::ResultExt;

use crate::error::{PuffinInitStagerSnafu, Result};
Expand All @@ -35,10 +35,11 @@ use crate::sst::index::store::{self, InstrumentedStore};
type InstrumentedAsyncRead = store::InstrumentedAsyncRead<'static, FuturesAsyncReader>;
type InstrumentedAsyncWrite = store::InstrumentedAsyncWrite<'static, FuturesAsyncWriter>;

pub(crate) type BlobReader = <Arc<FsBlobGuard> as BlobGuard>::Reader;
pub(crate) type SstPuffinWriter = <SstPuffinManager as PuffinManager>::Writer;
pub(crate) type SstPuffinManager =
FsPuffinManager<Arc<BoundedStager>, ObjectStorePuffinFileAccessor>;
pub(crate) type SstPuffinReader = <SstPuffinManager as PuffinManager>::Reader;
pub(crate) type SstPuffinWriter = <SstPuffinManager as PuffinManager>::Writer;
pub(crate) type BlobReader = <<SstPuffinReader as PuffinReader>::Blob as BlobGuard>::Reader;

const STAGING_DIR: &str = "staging";

Expand Down
9 changes: 9 additions & 0 deletions src/puffin/src/file_format/reader/file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,15 @@ impl<R> PuffinFileReader<R> {
);
Ok(())
}

/// Converts the reader into an owned blob reader.
pub fn into_blob_reader(self, blob_metadata: &BlobMetadata) -> PartialReader<R> {
PartialReader::new(
self.source,
blob_metadata.offset as _,
blob_metadata.length as _,
)
}
}

impl<'a, R: io::Read + io::Seek + 'a> SyncReader<'a> for PuffinFileReader<R> {
Expand Down
93 changes: 75 additions & 18 deletions src/puffin/src/puffin_manager/fs_puffin_manager/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,16 +19,17 @@ use futures::io::BufReader;
use futures::{AsyncRead, AsyncReadExt, AsyncWrite};
use snafu::{ensure, OptionExt, ResultExt};

use crate::blob_metadata::CompressionCodec;
use crate::blob_metadata::{BlobMetadata, CompressionCodec};
use crate::error::{
BlobIndexOutOfBoundSnafu, BlobNotFoundSnafu, DeserializeJsonSnafu, FileKeyNotMatchSnafu,
ReadSnafu, Result, UnsupportedDecompressionSnafu, WriteSnafu,
};
use crate::file_format::reader::{AsyncReader, PuffinFileReader};
use crate::partial_reader::PartialReader;
use crate::puffin_manager::file_accessor::PuffinFileAccessor;
use crate::puffin_manager::fs_puffin_manager::dir_meta::DirMetadata;
use crate::puffin_manager::stager::{BoxWriter, DirWriterProviderRef, Stager};
use crate::puffin_manager::PuffinReader;
use crate::puffin_manager::{BlobGuard, PuffinReader};

/// `FsPuffinReader` is a `PuffinReader` that provides fs readers for puffin files.
pub struct FsPuffinReader<S, F> {
Expand Down Expand Up @@ -58,22 +59,26 @@ where
S: Stager,
F: PuffinFileAccessor + Clone,
{
type Blob = S::Blob;
type Blob = RandomReadBlob<F>;
type Dir = S::Dir;

async fn blob(&self, key: &str) -> Result<Self::Blob> {
self.stager
.get_blob(
self.puffin_file_name.as_str(),
key,
Box::new(move |writer| {
let accessor = self.puffin_file_accessor.clone();
let puffin_file_name = self.puffin_file_name.clone();
let key = key.to_string();
Self::init_blob_to_cache(puffin_file_name, key, writer, accessor)
}),
)
.await
let blob_metadata = self.blob_metadata(key).await?;

// If we choose to perform random reads directly on blobs
// within the puffin file, then they must not be compressed.
ensure!(
blob_metadata.compression_codec.is_none(),
UnsupportedDecompressionSnafu {
decompression: blob_metadata.compression_codec.unwrap().to_string()
}
);

Ok(RandomReadBlob {
file_name: self.puffin_file_name.clone(),
accessor: self.puffin_file_accessor.clone(),
blob_metadata: blob_metadata.clone(),
})
}

async fn dir(&self, key: &str) -> Result<Self::Dir> {
Expand All @@ -85,7 +90,7 @@ where
let accessor = self.puffin_file_accessor.clone();
let puffin_file_name = self.puffin_file_name.clone();
let key = key.to_string();
Self::init_dir_to_cache(puffin_file_name, key, writer_provider, accessor)
Self::init_dir_to_stager(puffin_file_name, key, writer_provider, accessor)
}),
)
.await
Expand All @@ -97,7 +102,27 @@ where
S: Stager,
F: PuffinFileAccessor,
{
fn init_blob_to_cache(
// TODO(zhongzc): cache the metadata
async fn blob_metadata(&self, key: &str) -> Result<BlobMetadata> {
let reader = self
.puffin_file_accessor
.reader(&self.puffin_file_name)
.await?;
let mut file = PuffinFileReader::new(reader);

let metadata = file.metadata().await?;
let blob_metadata = metadata
.blobs
.into_iter()
.find(|m| m.blob_type == key)
.context(BlobNotFoundSnafu { blob: key })?;

Ok(blob_metadata)
}

// TODO(zhongzc): keep the function in case one day we need to stage the blob.
#[allow(dead_code)]
fn init_blob_to_stager(
puffin_file_name: String,
key: String,
mut writer: BoxWriter,
Expand All @@ -122,7 +147,7 @@ where
})
}

fn init_dir_to_cache(
fn init_dir_to_stager(
puffin_file_name: String,
key: String,
writer_provider: DirWriterProviderRef,
Expand Down Expand Up @@ -196,3 +221,35 @@ where
}
}
}

/// `RandomReadBlob` is a `BlobGuard` that directly reads the blob from the puffin file.
pub struct RandomReadBlob<F> {
file_name: String,
accessor: F,
blob_metadata: BlobMetadata,
}

impl<F: PuffinFileAccessor + Clone> BlobGuard for RandomReadBlob<F> {
type Reader = PartialReader<F::Reader>;

fn reader(&self) -> BoxFuture<'static, Result<Self::Reader>> {
let accessor = self.accessor.clone();
let file_name = self.file_name.clone();
let blob_metadata = self.blob_metadata.clone();

Box::pin(async move {
// If we choose to perform random reads directly on blobs
// within the puffin file, then they must not be compressed.
ensure!(
blob_metadata.compression_codec.is_none(),
UnsupportedDecompressionSnafu {
decompression: blob_metadata.compression_codec.unwrap().to_string()
}
);

let reader = accessor.reader(&file_name).await?;
let blob_reader = PuffinFileReader::new(reader).into_blob_reader(&blob_metadata);
Ok(blob_reader)
})
}
}
Loading

0 comments on commit 9bb3a40

Please sign in to comment.