Skip to content

Commit

Permalink
feat: customize copy to parquet parameter (#4328)
Browse files Browse the repository at this point in the history
* feat/copy-to-parquet-parameter: Commit Message:

Enhance Parquet Writer with Column-wise Configuration

Summary:

 • Introduced column_wise_config function to customize per-column properties in Parquet writer.

* feat/copy-to-parquet-parameter: Commit Message:

Enhance Parquet File Format Handling for Specific Data Types

Summary:

 • Added ConcreteDataType import to support specific data type handling.

* feat/copy-to-parquet-parameter: Commit Message:

 Refactor Parquet file format configuration

* feat/copy-to-parquet-parameter:
 Enhance Parquet file format handling for timestamp columns

 - Added logic to disable dictionary encoding and set DELTA_BINARY_PACKED encoding for timestamp columns in the Parquet file format configuration.

* feat/copy-to-parquet-parameter:
 Disable dictionary encoding for timestamp columns in Parquet writer and update default max_active_window_runs in TwcsOptions

 - Modified Parquet writer to disable dictionary encoding for timestamp columns to optimize for increasing timestamp data.

* feat/copy-to-parquet-parameter:
 Update compaction settings in tests

 - Modified `test_compaction_region` to include new compaction options: `compaction.type`,
 `compaction.twcs.max_active_window_runs`, and `compaction.twcs.max_inactive_window_runs`.
 - Updated `test_merge_mode_compaction` to use `compaction.twcs.max_active_window_runs` and
 `compaction.twcs.max_inactive_window_runs` instead of `max_active_window_files` and
 `max_inactive_window_files`.
  • Loading branch information
v0y4g3r authored Jul 10, 2024
1 parent 20417e6 commit da0c840
Show file tree
Hide file tree
Showing 5 changed files with 49 additions and 19 deletions.
36 changes: 29 additions & 7 deletions src/common/datasource/src/file_format/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,14 @@ use datafusion::parquet::file::metadata::ParquetMetaData;
use datafusion::parquet::format::FileMetaData;
use datafusion::physical_plan::metrics::ExecutionPlanMetricsSet;
use datafusion::physical_plan::SendableRecordBatchStream;
use datatypes::schema::SchemaRef;
use futures::future::BoxFuture;
use futures::StreamExt;
use object_store::{FuturesAsyncReader, ObjectStore};
use parquet::arrow::AsyncArrowWriter;
use parquet::basic::{Compression, ZstdLevel};
use parquet::file::properties::WriterProperties;
use parquet::basic::{Compression, Encoding, ZstdLevel};
use parquet::file::properties::{WriterProperties, WriterPropertiesBuilder};
use parquet::schema::types::ColumnPath;
use snafu::ResultExt;
use tokio_util::compat::{Compat, FuturesAsyncReadCompatExt, FuturesAsyncWriteCompatExt};

Expand Down Expand Up @@ -184,14 +186,16 @@ impl ArrowWriterCloser for ArrowWriter<SharedBuffer> {
/// Returns number of rows written.
pub async fn stream_to_parquet(
mut stream: SendableRecordBatchStream,
schema: datatypes::schema::SchemaRef,
store: ObjectStore,
path: &str,
concurrency: usize,
) -> Result<usize> {
let write_props = WriterProperties::builder()
.set_compression(Compression::ZSTD(ZstdLevel::default()))
.build();
let schema = stream.schema();
let write_props = column_wise_config(
WriterProperties::builder().set_compression(Compression::ZSTD(ZstdLevel::default())),
schema,
)
.build();
let inner_writer = store
.writer_with(path)
.concurrent(concurrency)
Expand All @@ -200,7 +204,7 @@ pub async fn stream_to_parquet(
.map(|w| w.into_futures_async_write().compat_write())
.context(WriteObjectSnafu { path })?;

let mut writer = AsyncArrowWriter::try_new(inner_writer, schema, Some(write_props))
let mut writer = AsyncArrowWriter::try_new(inner_writer, stream.schema(), Some(write_props))
.context(WriteParquetSnafu { path })?;
let mut rows_written = 0;

Expand All @@ -216,6 +220,24 @@ pub async fn stream_to_parquet(
Ok(rows_written)
}

/// Customizes per-column properties.
fn column_wise_config(
mut props: WriterPropertiesBuilder,
schema: SchemaRef,
) -> WriterPropertiesBuilder {
// Disable dictionary for timestamp column, since for increasing timestamp column,
// the dictionary pages will be larger than data pages.
for col in schema.column_schemas() {
if col.data_type.is_timestamp() {
let path = ColumnPath::new(vec![col.name.clone()]);
props = props
.set_column_dictionary_enabled(path.clone(), false)
.set_column_encoding(path, Encoding::DELTA_BINARY_PACKED)
}
}
props
}

#[cfg(test)]
mod tests {
use common_test_util::find_workspace_path;
Expand Down
6 changes: 5 additions & 1 deletion src/mito2/src/engine/compaction_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,11 @@ async fn test_compaction_region() {
let engine = env.create_engine(MitoConfig::default()).await;

let region_id = RegionId::new(1, 1);
let request = CreateRequestBuilder::new().build();
let request = CreateRequestBuilder::new()
.insert_option("compaction.type", "twcs")
.insert_option("compaction.twcs.max_active_window_runs", "1")
.insert_option("compaction.twcs.max_inactive_window_runs", "1")
.build();

let column_schemas = request
.column_metadatas
Expand Down
4 changes: 2 additions & 2 deletions src/mito2/src/engine/merge_mode_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -101,8 +101,8 @@ async fn test_merge_mode_compaction() {
let request = CreateRequestBuilder::new()
.field_num(2)
.insert_option("compaction.type", "twcs")
.insert_option("compaction.twcs.max_active_window_files", "2")
.insert_option("compaction.twcs.max_inactive_window_files", "2")
.insert_option("compaction.twcs.max_active_window_runs", "1")
.insert_option("compaction.twcs.max_inactive_window_runs", "1")
.insert_option("merge_mode", "last_non_null")
.build();
let region_dir = request.region_dir.clone();
Expand Down
2 changes: 1 addition & 1 deletion src/mito2/src/region/options.rs
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,7 @@ impl TwcsOptions {
impl Default for TwcsOptions {
fn default() -> Self {
Self {
max_active_window_runs: 1,
max_active_window_runs: 4,
max_inactive_window_runs: 1,
time_window: None,
remote_compaction: false,
Expand Down
20 changes: 12 additions & 8 deletions src/operator/src/statement/copy_table_to.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,14 +75,18 @@ impl StatementExecutor {
)
.await
.context(error::WriteStreamToFileSnafu { path }),
Format::Parquet(_) => stream_to_parquet(
Box::pin(DfRecordBatchStreamAdapter::new(stream)),
object_store,
path,
WRITE_CONCURRENCY,
)
.await
.context(error::WriteStreamToFileSnafu { path }),
Format::Parquet(_) => {
let schema = stream.schema();
stream_to_parquet(
Box::pin(DfRecordBatchStreamAdapter::new(stream)),
schema,
object_store,
path,
WRITE_CONCURRENCY,
)
.await
.context(error::WriteStreamToFileSnafu { path })
}
_ => error::UnsupportedFormatSnafu { format: *format }.fail(),
}
}
Expand Down

0 comments on commit da0c840

Please sign in to comment.