Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add option to disable writing of Parquet offset index #6797

Merged
merged 7 commits into from
Nov 27, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
74 changes: 64 additions & 10 deletions parquet/src/column/writer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -347,7 +347,7 @@ pub struct GenericColumnWriter<'a, E: ColumnValueEncoder> {
data_pages: VecDeque<CompressedPage>,
// column index and offset index
column_index_builder: ColumnIndexBuilder,
offset_index_builder: OffsetIndexBuilder,
offset_index_builder: Option<OffsetIndexBuilder>,

// Below fields used to incrementally check boundary order across data pages.
// We assume they are ascending/descending until proven wrong.
Expand Down Expand Up @@ -394,6 +394,12 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> {
column_index_builder.to_invalid()
}

// Disable offset_index_builder if requested by user.
let offset_index_builder = match props.offset_index_disabled() {
false => Some(OffsetIndexBuilder::new()),
_ => None,
};

Self {
descr,
props,
Expand All @@ -408,7 +414,7 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> {
page_metrics,
column_metrics,
column_index_builder,
offset_index_builder: OffsetIndexBuilder::new(),
offset_index_builder,
encodings,
data_page_boundary_ascending: true,
data_page_boundary_descending: true,
Expand Down Expand Up @@ -613,7 +619,8 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> {
.column_index_builder
.valid()
.then(|| self.column_index_builder.build_to_thrift());
let offset_index = Some(self.offset_index_builder.build_to_thrift());

let offset_index = self.offset_index_builder.map(|b| b.build_to_thrift());

Ok(ColumnCloseResult {
bytes_written: self.column_metrics.total_bytes_written,
Expand Down Expand Up @@ -841,11 +848,10 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> {
);

// Update the offset index
self.offset_index_builder
.append_row_count(self.page_metrics.num_buffered_rows as i64);

self.offset_index_builder
.append_unencoded_byte_array_data_bytes(page_variable_length_bytes);
if let Some(builder) = self.offset_index_builder.as_mut() {
builder.append_row_count(self.page_metrics.num_buffered_rows as i64);
builder.append_unencoded_byte_array_data_bytes(page_variable_length_bytes);
}
}

/// Determine if we should allow truncating min/max values for this column's statistics
Expand Down Expand Up @@ -1174,8 +1180,10 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> {
let page_spec = self.page_writer.write_page(page)?;
// update offset index
// compressed_size = header_size + compressed_data_size
self.offset_index_builder
.append_offset_and_size(page_spec.offset as i64, page_spec.compressed_size as i32);
if let Some(builder) = self.offset_index_builder.as_mut() {
builder
.append_offset_and_size(page_spec.offset as i64, page_spec.compressed_size as i32)
}
self.update_metrics_for_page(page_spec);
Ok(())
}
Expand Down Expand Up @@ -3215,6 +3223,52 @@ mod tests {
assert!(column_close_result.column_index.is_none());
}

#[test]
fn test_no_offset_index_when_disabled() {
// Test that offset indexes can be disabled
let descr = Arc::new(get_test_column_descr::<Int32Type>(1, 0));
let props = Arc::new(
WriterProperties::builder()
.set_statistics_enabled(EnabledStatistics::None)
.set_offset_index_disabled(true)
.build(),
);
let column_writer = get_column_writer(descr, props, get_test_page_writer());
let mut writer = get_typed_column_writer::<Int32Type>(column_writer);

let data = Vec::new();
let def_levels = vec![0; 10];
writer.write_batch(&data, Some(&def_levels), None).unwrap();
writer.flush_data_pages().unwrap();

let column_close_result = writer.close().unwrap();
assert!(column_close_result.offset_index.is_none());
assert!(column_close_result.column_index.is_none());
}

#[test]
fn test_offset_index_overridden() {
// Test that offset indexes are not disabled when gathering page statistics
let descr = Arc::new(get_test_column_descr::<Int32Type>(1, 0));
let props = Arc::new(
WriterProperties::builder()
.set_statistics_enabled(EnabledStatistics::Page)
.set_offset_index_disabled(true)
.build(),
);
let column_writer = get_column_writer(descr, props, get_test_page_writer());
let mut writer = get_typed_column_writer::<Int32Type>(column_writer);

let data = Vec::new();
let def_levels = vec![0; 10];
writer.write_batch(&data, Some(&def_levels), None).unwrap();
writer.flush_data_pages().unwrap();

let column_close_result = writer.close().unwrap();
assert!(column_close_result.offset_index.is_some());
assert!(column_close_result.column_index.is_some());
}

#[test]
fn test_boundary_order() -> Result<()> {
let descr = Arc::new(get_test_column_descr::<Int32Type>(1, 0));
Expand Down
37 changes: 37 additions & 0 deletions parquet/src/file/properties.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,8 @@ pub const DEFAULT_BLOOM_FILTER_FPP: f64 = 0.05;
pub const DEFAULT_BLOOM_FILTER_NDV: u64 = 1_000_000_u64;
/// Default values for [`WriterProperties::statistics_truncate_length`]
pub const DEFAULT_STATISTICS_TRUNCATE_LENGTH: Option<usize> = None;
/// Default value for [`WriterProperties::offset_index_disabled`]
pub const DEFAULT_OFFSET_INDEX_DISABLED: bool = false;
/// Default values for [`WriterProperties::coerce_types`]
pub const DEFAULT_COERCE_TYPES: bool = false;

Expand Down Expand Up @@ -159,6 +161,7 @@ pub struct WriterProperties {
bloom_filter_position: BloomFilterPosition,
writer_version: WriterVersion,
created_by: String,
offset_index_disabled: bool,
pub(crate) key_value_metadata: Option<Vec<KeyValue>>,
default_column_properties: ColumnProperties,
column_properties: HashMap<ColumnPath, ColumnProperties>,
Expand Down Expand Up @@ -244,6 +247,22 @@ impl WriterProperties {
&self.created_by
}

/// Returns `true` if offset index writing is disabled.
pub fn offset_index_disabled(&self) -> bool {
// If page statistics are to be collected, then do not disable the offset indexes.
let default_page_stats_enabled =
self.default_column_properties.statistics_enabled() == Some(EnabledStatistics::Page);
let column_page_stats_enabled = self
.column_properties
.iter()
.any(|path_props| path_props.1.statistics_enabled() == Some(EnabledStatistics::Page));
if default_page_stats_enabled || column_page_stats_enabled {
return false;
}

self.offset_index_disabled
}

/// Returns `key_value_metadata` KeyValue pairs.
pub fn key_value_metadata(&self) -> Option<&Vec<KeyValue>> {
self.key_value_metadata.as_ref()
Expand Down Expand Up @@ -371,6 +390,7 @@ pub struct WriterPropertiesBuilder {
bloom_filter_position: BloomFilterPosition,
writer_version: WriterVersion,
created_by: String,
offset_index_disabled: bool,
key_value_metadata: Option<Vec<KeyValue>>,
default_column_properties: ColumnProperties,
column_properties: HashMap<ColumnPath, ColumnProperties>,
Expand All @@ -392,6 +412,7 @@ impl WriterPropertiesBuilder {
bloom_filter_position: DEFAULT_BLOOM_FILTER_POSITION,
writer_version: DEFAULT_WRITER_VERSION,
created_by: DEFAULT_CREATED_BY.to_string(),
offset_index_disabled: DEFAULT_OFFSET_INDEX_DISABLED,
key_value_metadata: None,
default_column_properties: Default::default(),
column_properties: HashMap::new(),
Expand All @@ -413,6 +434,7 @@ impl WriterPropertiesBuilder {
bloom_filter_position: self.bloom_filter_position,
writer_version: self.writer_version,
created_by: self.created_by,
offset_index_disabled: self.offset_index_disabled,
key_value_metadata: self.key_value_metadata,
default_column_properties: self.default_column_properties,
column_properties: self.column_properties,
Expand Down Expand Up @@ -515,6 +537,21 @@ impl WriterPropertiesBuilder {
self
}

/// Sets whether the writing of offset indexes is disabled (defaults to `false`).
///
/// If statistics level is set to [`Page`] this setting will be overridden with `false`.
///
/// Note: As the offset indexes are useful for accessing data by row number,
/// they are always written by default, regardless of whether other statistics
/// are enabled. Disabling this metadata may result in a degradation in read
/// performance, so use this option with care.
///
/// [`Page`]: EnabledStatistics::Page
pub fn set_offset_index_disabled(mut self, value: bool) -> Self {
self.offset_index_disabled = value;
self
}

/// Sets "key_value_metadata" property (defaults to `None`).
pub fn set_key_value_metadata(mut self, value: Option<Vec<KeyValue>>) -> Self {
self.key_value_metadata = value;
Expand Down
1 change: 1 addition & 0 deletions parquet/src/file/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1742,6 +1742,7 @@ mod tests {
let props = WriterProperties::builder()
.set_statistics_enabled(EnabledStatistics::None)
.set_column_statistics_enabled("a".into(), EnabledStatistics::Page)
.set_offset_index_disabled(true) // this should be ignored because of the line above
.build();
let mut file = Vec::with_capacity(1024);
let mut file_writer =
Expand Down
Loading