Skip to content

Commit

Permalink
[ENH] Add metadata to block
Browse files Browse the repository at this point in the history
  • Loading branch information
HammadB committed Oct 9, 2024
1 parent 9216ddb commit 3eb8123
Show file tree
Hide file tree
Showing 6 changed files with 76 additions and 44 deletions.
14 changes: 12 additions & 2 deletions rust/blockstore/src/arrow/block/delta/delta.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
use std::collections::HashMap;

use super::storage::BlockStorage;
use crate::{
arrow::types::{ArrowWriteableKey, ArrowWriteableValue},
Expand Down Expand Up @@ -66,9 +68,17 @@ impl BlockDelta {
self.builder.get_size::<K>()
}

/// Finishes the block delta and converts it into a record batch.
/// # Arguments
/// - metadata: the metadata to attach to the record batch.
/// # Returns
/// A record batch with the key value pairs in the block delta.
#[allow(clippy::extra_unused_type_parameters)]
pub fn finish<K: ArrowWriteableKey, V: ArrowWriteableValue>(self) -> RecordBatch {
self.builder.into_record_batch::<K>()
pub fn finish<K: ArrowWriteableKey, V: ArrowWriteableValue>(
self,
metadata: Option<HashMap<String, String>>,
) -> RecordBatch {
self.builder.into_record_batch::<K>(metadata)
}

/// Splits the block delta into two block deltas. The split point is the last key
Expand Down
80 changes: 47 additions & 33 deletions rust/blockstore/src/arrow/block/delta/single_column_storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,17 @@ use crate::{
Value,
};
use arrow::{
array::{
Array, BinaryBuilder, ListBuilder, RecordBatch, StringBuilder, UInt32Array, UInt32Builder,
},
datatypes::Field,
array::{Array, BinaryBuilder, ListBuilder, StringBuilder, UInt32Array, UInt32Builder},
datatypes::{Field, Schema},
util::bit_util,
};
use parking_lot::RwLock;
use roaring::RoaringBitmap;
use std::{collections::BTreeMap, sync::Arc};
use std::{
collections::{BTreeMap, HashMap},
sync::Arc,
vec,
};

#[derive(Clone)]
pub struct SingleColumnStorage<T: ArrowWriteableValue> {
Expand Down Expand Up @@ -222,7 +224,8 @@ impl SingleColumnStorage<String> {
pub(super) fn into_arrow(
self,
key_builder: BlockKeyArrowBuilder,
) -> Result<RecordBatch, arrow::error::ArrowError> {
metadata: Option<HashMap<String, String>>,
) -> (Arc<Schema>, Vec<Arc<dyn Array>>) {
// Build key and value.
let mut key_builder = key_builder;
let item_capacity = self.len();
Expand Down Expand Up @@ -250,20 +253,23 @@ impl SingleColumnStorage<String> {
let value_field = Field::new("value", arrow::datatypes::DataType::Utf8, false);
let value_arr = value_builder.finish();
let value_arr = (&value_arr as &dyn Array).slice(0, value_arr.len());
let schema = Arc::new(arrow::datatypes::Schema::new(vec![
prefix_field,
key_field,
value_field,
]));
RecordBatch::try_new(schema, vec![prefix_arr, key_arr, value_arr])
let schema = arrow::datatypes::Schema::new(vec![prefix_field, key_field, value_field]);

if let Some(metadata) = metadata {
let schema = schema.with_metadata(metadata);
return (schema.into(), vec![prefix_arr, key_arr, value_arr]);
}

(schema.into(), vec![prefix_arr, key_arr, value_arr])
}
}

impl SingleColumnStorage<Vec<u32>> {
pub(super) fn into_arrow(
self,
key_builder: BlockKeyArrowBuilder,
) -> Result<RecordBatch, arrow::error::ArrowError> {
metadata: Option<HashMap<String, String>>,
) -> (Arc<Schema>, Vec<Arc<dyn Array>>) {
// Build key and value.
let mut key_builder = key_builder;
let item_capacity = self.len();
Expand Down Expand Up @@ -303,20 +309,23 @@ impl SingleColumnStorage<Vec<u32>> {
);
let value_arr = value_builder.finish();
let value_arr = (&value_arr as &dyn Array).slice(0, value_arr.len());
let schema = Arc::new(arrow::datatypes::Schema::new(vec![
prefix_field,
key_field,
value_field,
]));
RecordBatch::try_new(schema, vec![prefix_arr, key_arr, value_arr])
let schema = arrow::datatypes::Schema::new(vec![prefix_field, key_field, value_field]);

if let Some(metadata) = metadata {
let schema = schema.with_metadata(metadata);
return (schema.into(), vec![prefix_arr, key_arr, value_arr]);
}

(schema.into(), vec![prefix_arr, key_arr, value_arr])
}
}

impl SingleColumnStorage<u32> {
pub(super) fn into_arrow(
self,
key_builder: BlockKeyArrowBuilder,
) -> Result<RecordBatch, arrow::error::ArrowError> {
metadata: Option<HashMap<String, String>>,
) -> (Arc<Schema>, Vec<Arc<dyn Array>>) {
// Build key and value.
let mut key_builder = key_builder;
let mut value_builder;
Expand All @@ -343,20 +352,23 @@ impl SingleColumnStorage<u32> {
let value_field = Field::new("value", arrow::datatypes::DataType::UInt32, false);
let value_arr = value_builder.finish();
let value_arr = (&value_arr as &dyn Array).slice(0, value_arr.len());
let schema = Arc::new(arrow::datatypes::Schema::new(vec![
prefix_field,
key_field,
value_field,
]));
RecordBatch::try_new(schema, vec![prefix_arr, key_arr, value_arr])
let schema = arrow::datatypes::Schema::new(vec![prefix_field, key_field, value_field]);

if let Some(metadata) = metadata {
let schema = schema.with_metadata(metadata);
return (schema.into(), vec![prefix_arr, key_arr, value_arr]);
}

(schema.into(), vec![prefix_arr, key_arr, value_arr])
}
}

impl SingleColumnStorage<RoaringBitmap> {
pub(super) fn into_arrow(
self,
key_builder: BlockKeyArrowBuilder,
) -> Result<RecordBatch, arrow::error::ArrowError> {
metadata: Option<HashMap<String, String>>,
) -> (Arc<Schema>, Vec<Arc<dyn Array>>) {
// Build key.
let mut key_builder = key_builder;
let item_capacity = self.len();
Expand Down Expand Up @@ -394,11 +406,13 @@ impl SingleColumnStorage<RoaringBitmap> {
let value_field = Field::new("value", arrow::datatypes::DataType::Binary, true);
let value_arr = value_builder.finish();
let value_arr = (&value_arr as &dyn Array).slice(0, value_arr.len());
let schema = Arc::new(arrow::datatypes::Schema::new(vec![
prefix_field,
key_field,
value_field,
]));
RecordBatch::try_new(schema, vec![prefix_arr, key_arr, value_arr])
let schema = arrow::datatypes::Schema::new(vec![prefix_field, key_field, value_field]);

if let Some(metadata) = metadata {
let schema = schema.with_metadata(metadata);
return (schema.into(), vec![prefix_arr, key_arr, value_arr]);
}

(schema.into(), vec![prefix_arr, key_arr, value_arr])
}
}
21 changes: 14 additions & 7 deletions rust/blockstore/src/arrow/block/delta/storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,8 @@ use arrow::{
};
use roaring::RoaringBitmap;
use std::{
fmt,
fmt::{Debug, Formatter},
collections::HashMap,
fmt::{self, Debug, Formatter},
};

#[derive(Clone)]
Expand Down Expand Up @@ -220,29 +220,36 @@ impl BlockStorage {
}
}

pub fn into_record_batch<K: ArrowWriteableKey>(self) -> RecordBatch {
pub fn into_record_batch<K: ArrowWriteableKey>(
self,
metadata: Option<HashMap<String, String>>,
) -> RecordBatch {
let key_builder =
K::get_arrow_builder(self.len(), self.get_prefix_size(), self.get_key_size());
match self {
BlockStorage::String(builder) => {
// TODO: handle error
builder.into_arrow(key_builder).unwrap()
let (schema, columns) = builder.into_arrow(key_builder, metadata);
RecordBatch::try_new(schema, columns).unwrap()
}
BlockStorage::UInt32(builder) => {
// TODO: handle error
builder.into_arrow(key_builder).unwrap()
let (schema, columns) = builder.into_arrow(key_builder, metadata);
RecordBatch::try_new(schema, columns).unwrap()
}
BlockStorage::DataRecord(builder) => {
// TODO: handle error
builder.into_arrow(key_builder).unwrap()
}
BlockStorage::VecUInt32(builder) => {
// TODO: handle error
builder.into_arrow(key_builder).unwrap()
let (schema, columns) = builder.into_arrow(key_builder, metadata);
RecordBatch::try_new(schema, columns).unwrap()
}
BlockStorage::RoaringBitmap(builder) => {
// TODO: handle error
builder.into_arrow(key_builder).unwrap()
let (schema, columns) = builder.into_arrow(key_builder, metadata);
RecordBatch::try_new(schema, columns).unwrap()
}
}
}
Expand Down
1 change: 1 addition & 0 deletions rust/blockstore/src/arrow/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,5 +4,6 @@ mod concurrency_test;
pub mod config;
pub(crate) mod flusher;
pub mod provider;
mod root;
pub mod sparse_index;
pub mod types;
2 changes: 1 addition & 1 deletion rust/blockstore/src/arrow/provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -250,7 +250,7 @@ impl BlockManager {
delta: BlockDelta,
) -> Block {
let delta_id = delta.id;
let record_batch = delta.finish::<K, V>();
let record_batch = delta.finish::<K, V>(None);
Block::from_record_batch(delta_id, record_batch)
}

Expand Down
2 changes: 1 addition & 1 deletion rust/blockstore/src/arrow/sparse_index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -505,7 +505,7 @@ impl SparseIndex {
}

let delta_id = delta.id;
let record_batch = delta.finish::<K, String>();
let record_batch = delta.finish::<K, String>(None);
Ok(Block::from_record_batch(delta_id, record_batch))
}

Expand Down

0 comments on commit 3eb8123

Please sign in to comment.