Skip to content

Commit

Permalink
fix: dictionary key type use u32 (#4396)
Browse files Browse the repository at this point in the history
* fix: dictionary key type use u32

* fix: fix error whle reading content

* fix: bulk memtable dictionary type
  • Loading branch information
evenyag authored Jul 19, 2024
1 parent b90267d commit 7aae19a
Show file tree
Hide file tree
Showing 3 changed files with 25 additions and 21 deletions.
19 changes: 10 additions & 9 deletions src/mito2/src/memtable/bulk/part.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,11 @@ use datafusion::arrow::array::{TimestampNanosecondArray, UInt64Builder};
use datatypes::arrow;
use datatypes::arrow::array::{
Array, ArrayRef, BinaryBuilder, DictionaryArray, RecordBatch, TimestampMicrosecondArray,
TimestampMillisecondArray, TimestampSecondArray, UInt16Array, UInt32Array, UInt64Array,
UInt8Array, UInt8Builder,
TimestampMillisecondArray, TimestampSecondArray, UInt32Array, UInt64Array, UInt8Array,
UInt8Builder,
};
use datatypes::arrow::compute::TakeOptions;
use datatypes::arrow::datatypes::{DataType as ArrowDataType, SchemaRef, UInt16Type};
use datatypes::arrow::datatypes::{DataType as ArrowDataType, SchemaRef};
use datatypes::arrow_array::BinaryArray;
use datatypes::data_type::DataType;
use datatypes::prelude::{MutableVector, ScalarVectorBuilder, Vector};
Expand All @@ -40,6 +40,7 @@ use crate::error::{ComputeArrowSnafu, EncodeMemtableSnafu, NewRecordBatchSnafu,
use crate::memtable::key_values::KeyValuesRef;
use crate::read::Batch;
use crate::row_converter::{McmpRowCodec, RowCodec};
use crate::sst::parquet::format::PrimaryKeyArray;
use crate::sst::to_sst_arrow_schema;

#[derive(Debug)]
Expand Down Expand Up @@ -344,17 +345,17 @@ fn timestamp_array_to_iter(
}

/// Converts a **sorted** [BinaryArray] to [DictionaryArray].
fn binary_array_to_dictionary(input: &BinaryArray) -> Result<DictionaryArray<UInt16Type>> {
fn binary_array_to_dictionary(input: &BinaryArray) -> Result<PrimaryKeyArray> {
if input.is_empty() {
return Ok(DictionaryArray::new(
UInt16Array::from(Vec::<u16>::new()),
UInt32Array::from(Vec::<u32>::new()),
Arc::new(BinaryArray::from_vec(vec![])) as ArrayRef,
));
}
let mut keys = Vec::with_capacity(16);
let mut values = BinaryBuilder::new();
let mut prev: usize = 0;
keys.push(prev as u16);
keys.push(prev as u32);
values.append_value(input.value(prev));

for current_bytes in input.iter().skip(1) {
Expand All @@ -365,11 +366,11 @@ fn binary_array_to_dictionary(input: &BinaryArray) -> Result<DictionaryArray<UIn
values.append_value(current_bytes);
prev += 1;
}
keys.push(prev as u16);
keys.push(prev as u32);
}

Ok(DictionaryArray::new(
UInt16Array::from(keys),
UInt32Array::from(keys),
Arc::new(values.finish()) as ArrayRef,
))
}
Expand All @@ -387,7 +388,7 @@ mod tests {

fn check_binary_array_to_dictionary(
input: &[&[u8]],
expected_keys: &[u16],
expected_keys: &[u32],
expected_values: &[&[u8]],
) {
let input = BinaryArray::from_iter_values(input.iter());
Expand Down
2 changes: 1 addition & 1 deletion src/mito2/src/sst.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ fn internal_fields() -> [FieldRef; 3] {
[
Arc::new(Field::new_dictionary(
PRIMARY_KEY_COLUMN_NAME,
ArrowDataType::UInt16,
ArrowDataType::UInt32,
ArrowDataType::Binary,
false,
)),
Expand Down
25 changes: 14 additions & 11 deletions src/mito2/src/sst/parquet/format.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
//! Format to store in parquet.
//!
//! We store three internal columns in parquet:
//! - `__primary_key`, the primary key of the row (tags). Type: dictionary(uint16, binary)
//! - `__primary_key`, the primary key of the row (tags). Type: dictionary(uint32, binary)
//! - `__sequence`, the sequence number of a row. Type: uint64
//! - `__op_type`, the op type of the row. Type: uint8
//!
Expand All @@ -32,8 +32,8 @@ use std::sync::Arc;

use api::v1::SemanticType;
use datafusion_common::ScalarValue;
use datatypes::arrow::array::{ArrayRef, BinaryArray, DictionaryArray, UInt16Array, UInt64Array};
use datatypes::arrow::datatypes::{SchemaRef, UInt16Type};
use datatypes::arrow::array::{ArrayRef, BinaryArray, DictionaryArray, UInt32Array, UInt64Array};
use datatypes::arrow::datatypes::{SchemaRef, UInt32Type};
use datatypes::arrow::record_batch::RecordBatch;
use datatypes::prelude::DataType;
use datatypes::vectors::{Helper, Vector};
Expand All @@ -50,6 +50,9 @@ use crate::read::{Batch, BatchBuilder, BatchColumn};
use crate::row_converter::{McmpRowCodec, RowCodec, SortField};
use crate::sst::to_sst_arrow_schema;

/// Arrow array type for the primary key dictionary.
pub(crate) type PrimaryKeyArray = DictionaryArray<UInt32Type>;

/// Number of columns that have fixed positions.
///
/// Contains: time index and internal columns.
Expand Down Expand Up @@ -230,7 +233,7 @@ impl ReadFormat {
// Compute primary key offsets.
let pk_dict_array = pk_array
.as_any()
.downcast_ref::<DictionaryArray<UInt16Type>>()
.downcast_ref::<PrimaryKeyArray>()
.with_context(|| InvalidRecordBatchSnafu {
reason: format!("primary key array should not be {:?}", pk_array.data_type()),
})?;
Expand All @@ -255,7 +258,7 @@ impl ReadFormat {
let end = offsets[i + 1];
let rows_in_batch = end - start;
let dict_key = keys.value(*start);
let primary_key = pk_values.value(dict_key.into()).to_vec();
let primary_key = pk_values.value(dict_key as usize).to_vec();

let mut builder = BatchBuilder::new(primary_key);
builder
Expand Down Expand Up @@ -524,7 +527,7 @@ impl ReadFormat {
}

/// Compute offsets of different primary keys in the array.
fn primary_key_offsets(pk_dict_array: &DictionaryArray<UInt16Type>) -> Result<Vec<usize>> {
fn primary_key_offsets(pk_dict_array: &PrimaryKeyArray) -> Result<Vec<usize>> {
if pk_dict_array.is_empty() {
return Ok(Vec::new());
}
Expand All @@ -549,7 +552,7 @@ fn primary_key_offsets(pk_dict_array: &DictionaryArray<UInt16Type>) -> Result<Ve
/// Creates a new array for specific `primary_key`.
fn new_primary_key_array(primary_key: &[u8], num_rows: usize) -> ArrayRef {
let values = Arc::new(BinaryArray::from_iter_values([primary_key]));
let keys = UInt16Array::from_value(0, num_rows);
let keys = UInt32Array::from_value(0, num_rows);

// Safety: The key index is valid.
Arc::new(DictionaryArray::new(keys, values))
Expand Down Expand Up @@ -627,7 +630,7 @@ mod tests {
Field::new(
"__primary_key",
ArrowDataType::Dictionary(
Box::new(ArrowDataType::UInt16),
Box::new(ArrowDataType::UInt32),
Box::new(ArrowDataType::Binary),
),
false,
Expand Down Expand Up @@ -674,15 +677,15 @@ mod tests {
assert_eq!(&expect, &array);
}

fn build_test_pk_array(pk_row_nums: &[(Vec<u8>, usize)]) -> Arc<DictionaryArray<UInt16Type>> {
fn build_test_pk_array(pk_row_nums: &[(Vec<u8>, usize)]) -> Arc<PrimaryKeyArray> {
let values = Arc::new(BinaryArray::from_iter_values(
pk_row_nums.iter().map(|v| &v.0),
));
let mut keys = vec![];
for (index, num_rows) in pk_row_nums.iter().map(|v| v.1).enumerate() {
keys.extend(std::iter::repeat(index as u16).take(num_rows));
keys.extend(std::iter::repeat(index as u32).take(num_rows));
}
let keys = UInt16Array::from(keys);
let keys = UInt32Array::from(keys);
Arc::new(DictionaryArray::new(keys, values))
}

Expand Down

0 comments on commit 7aae19a

Please sign in to comment.