Skip to content

Commit

Permalink
perf(puffin): not to stage uncompressed blob (#4333)
Browse files Browse the repository at this point in the history
* feat(puffin): not to stage blob

Signed-off-by: Zhenchi <[email protected]>

* feat: back with compressed blob

Signed-off-by: Zhenchi <[email protected]>

---------

Signed-off-by: Zhenchi <[email protected]>
  • Loading branch information
zhongzc authored Jul 11, 2024
1 parent da0c840 commit da1ea25
Show file tree
Hide file tree
Showing 14 changed files with 288 additions and 146 deletions.
4 changes: 0 additions & 4 deletions src/mito2/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -371,9 +371,6 @@ pub struct InvertedIndexConfig {
/// Memory threshold for performing an external sort during index creation.
pub mem_threshold_on_create: MemoryThreshold,

/// Whether to compress the index data.
pub compress: bool,

#[deprecated = "use [IndexConfig::aux_path] instead"]
#[serde(skip_serializing)]
pub intermediate_path: String,
Expand All @@ -396,7 +393,6 @@ impl Default for InvertedIndexConfig {
create_on_compaction: Mode::Auto,
apply_on_query: Mode::Auto,
mem_threshold_on_create: MemoryThreshold::Auto,
compress: true,
write_buffer_size: ReadableSize::mb(8),
intermediate_path: String::new(),
metadata_cache_size: ReadableSize::mb(32),
Expand Down
2 changes: 1 addition & 1 deletion src/mito2/src/engine/basic_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -580,7 +580,7 @@ async fn test_region_usage() {
flush_region(&engine, region_id, None).await;

let region_stat = region.region_usage();
assert_eq!(region_stat.sst_usage, 3026);
assert_eq!(region_stat.sst_usage, 3010);

// region total usage
// Some memtables may share items.
Expand Down
1 change: 0 additions & 1 deletion src/mito2/src/sst/index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,6 @@ impl<'a> IndexerBuilder<'a> {
self.intermediate_manager.clone(),
self.inverted_index_config.mem_threshold_on_create(),
segment_row_count,
self.inverted_index_config.compress,
&self.index_options.inverted_index.ignore_column_ids,
);

Expand Down
12 changes: 1 addition & 11 deletions src/mito2/src/sst/index/inverted_index/creator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ use index::inverted_index::create::sort::external_sort::ExternalSorter;
use index::inverted_index::create::sort_create::SortIndexCreator;
use index::inverted_index::create::InvertedIndexCreator;
use index::inverted_index::format::writer::InvertedIndexBlobWriter;
use puffin::blob_metadata::CompressionCodec;
use puffin::puffin_manager::{PuffinWriter, PutOptions};
use snafu::{ensure, ResultExt};
use store_api::metadata::RegionMetadataRef;
Expand Down Expand Up @@ -71,9 +70,6 @@ pub struct SstIndexCreator {
/// The memory usage of the index creator.
memory_usage: Arc<AtomicUsize>,

/// Whether to compress the index data.
compress: bool,

/// Ids of indexed columns.
column_ids: HashSet<ColumnId>,
}
Expand All @@ -87,7 +83,6 @@ impl SstIndexCreator {
intermediate_manager: IntermediateManager,
memory_usage_threshold: Option<usize>,
segment_row_count: NonZeroUsize,
compress: bool,
ignore_column_ids: &[ColumnId],
) -> Self {
let temp_file_provider = Arc::new(TempFileProvider::new(
Expand Down Expand Up @@ -122,7 +117,6 @@ impl SstIndexCreator {
stats: Statistics::default(),
aborted: false,
memory_usage,
compress,
column_ids,
}
}
Expand Down Expand Up @@ -242,12 +236,9 @@ impl SstIndexCreator {
let (tx, rx) = duplex(PIPE_BUFFER_SIZE_FOR_SENDING_BLOB);
let mut index_writer = InvertedIndexBlobWriter::new(tx.compat_write());

let put_options = PutOptions {
compression: self.compress.then_some(CompressionCodec::Zstd),
};
let (index_finish, puffin_add_blob) = futures::join!(
self.index_creator.finish(&mut index_writer),
puffin_writer.put_blob(INDEX_BLOB_TYPE, rx.compat(), put_options)
puffin_writer.put_blob(INDEX_BLOB_TYPE, rx.compat(), PutOptions::default())
);

match (
Expand Down Expand Up @@ -398,7 +389,6 @@ mod tests {
intm_mgr,
memory_threshold,
NonZeroUsize::new(segment_row_count).unwrap(),
false,
&[],
);

Expand Down
9 changes: 5 additions & 4 deletions src/mito2/src/sst/index/puffin_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@ use object_store::{FuturesAsyncReader, FuturesAsyncWriter, ObjectStore};
use puffin::error::{self as puffin_error, Result as PuffinResult};
use puffin::puffin_manager::file_accessor::PuffinFileAccessor;
use puffin::puffin_manager::fs_puffin_manager::FsPuffinManager;
use puffin::puffin_manager::stager::{BoundedStager, FsBlobGuard};
use puffin::puffin_manager::{BlobGuard, PuffinManager};
use puffin::puffin_manager::stager::BoundedStager;
use puffin::puffin_manager::{BlobGuard, PuffinManager, PuffinReader};
use snafu::ResultExt;

use crate::error::{PuffinInitStagerSnafu, Result};
Expand All @@ -35,10 +35,11 @@ use crate::sst::index::store::{self, InstrumentedStore};
type InstrumentedAsyncRead = store::InstrumentedAsyncRead<'static, FuturesAsyncReader>;
type InstrumentedAsyncWrite = store::InstrumentedAsyncWrite<'static, FuturesAsyncWriter>;

pub(crate) type BlobReader = <Arc<FsBlobGuard> as BlobGuard>::Reader;
pub(crate) type SstPuffinWriter = <SstPuffinManager as PuffinManager>::Writer;
pub(crate) type SstPuffinManager =
FsPuffinManager<Arc<BoundedStager>, ObjectStorePuffinFileAccessor>;
pub(crate) type SstPuffinReader = <SstPuffinManager as PuffinManager>::Reader;
pub(crate) type SstPuffinWriter = <SstPuffinManager as PuffinManager>::Writer;
pub(crate) type BlobReader = <<SstPuffinReader as PuffinReader>::Blob as BlobGuard>::Reader;

const STAGING_DIR: &str = "staging";

Expand Down
9 changes: 9 additions & 0 deletions src/puffin/src/file_format/reader/file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,15 @@ impl<R> PuffinFileReader<R> {
);
Ok(())
}

/// Converts the reader into an owned blob reader.
pub fn into_blob_reader(self, blob_metadata: &BlobMetadata) -> PartialReader<R> {
PartialReader::new(
self.source,
blob_metadata.offset as _,
blob_metadata.length as _,
)
}
}

impl<'a, R: io::Read + io::Seek + 'a> SyncReader<'a> for PuffinFileReader<R> {
Expand Down
4 changes: 2 additions & 2 deletions src/puffin/src/puffin_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ mod tests;
use std::path::PathBuf;

use async_trait::async_trait;
use futures::future::BoxFuture;
use futures::{AsyncRead, AsyncSeek};

use crate::blob_metadata::CompressionCodec;
Expand Down Expand Up @@ -92,10 +91,11 @@ pub trait PuffinReader {

/// `BlobGuard` is provided by the `PuffinReader` to access the blob data.
/// Users should hold the `BlobGuard` until they are done with the blob data.
#[async_trait]
#[auto_impl::auto_impl(Arc)]
pub trait BlobGuard {
type Reader: AsyncRead + AsyncSeek + Unpin;
fn reader(&self) -> BoxFuture<'static, Result<Self::Reader>>;
async fn reader(&self) -> Result<Self::Reader>;
}

/// `DirGuard` is provided by the `PuffinReader` to access the directory in the filesystem.
Expand Down
2 changes: 1 addition & 1 deletion src/puffin/src/puffin_manager/file_accessor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use crate::error::Result;
#[async_trait]
#[auto_impl::auto_impl(Arc)]
pub trait PuffinFileAccessor: Send + Sync + 'static {
type Reader: AsyncRead + AsyncSeek + Unpin + Send;
type Reader: AsyncRead + AsyncSeek + Unpin + Send + Sync;
type Writer: AsyncWrite + Unpin + Send;

/// Opens a reader for the given puffin file.
Expand Down
2 changes: 1 addition & 1 deletion src/puffin/src/puffin_manager/fs_puffin_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ impl<S, F> FsPuffinManager<S, F> {
#[async_trait]
impl<S, F> PuffinManager for FsPuffinManager<S, F>
where
S: Stager + Clone,
S: Stager + Clone + 'static,
F: PuffinFileAccessor + Clone,
{
type Reader = FsPuffinReader<S, F>;
Expand Down
Loading

0 comments on commit da1ea25

Please sign in to comment.