Skip to content

Commit

Permalink
Sort binary (#569)
Browse files Browse the repository at this point in the history
* impl sort fixed binary array

Signed-off-by: Ruihang Xia <[email protected]>

* remove array builder util, add test cases

Signed-off-by: Ruihang Xia <[email protected]>

* impl sort for generic binary array

Signed-off-by: Ruihang Xia <[email protected]>

* tidy

Signed-off-by: Ruihang Xia <[email protected]>

* run clippy

Signed-off-by: Ruihang Xia <[email protected]>

* test: fixed binary with different prefix

Signed-off-by: Ruihang Xia <[email protected]>

* rebase master

Signed-off-by: Ruihang Xia <[email protected]>
  • Loading branch information
waynexia authored and alamb committed Jul 27, 2021
1 parent d32533a commit 90a4a8d
Show file tree
Hide file tree
Showing 4 changed files with 404 additions and 3 deletions.
10 changes: 10 additions & 0 deletions arrow/src/array/cast.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,16 @@ pub fn as_large_list_array(arr: &ArrayRef) -> &LargeListArray {
as_generic_list_array::<i64>(arr)
}

#[doc = "Force downcast ArrayRef to GenericBinaryArray"]
#[inline]
pub fn as_generic_binary_array<S: BinaryOffsetSizeTrait>(
arr: &ArrayRef,
) -> &GenericBinaryArray<S> {
arr.as_any()
.downcast_ref::<GenericBinaryArray<S>>()
.expect("Unable to downcast to binary array")
}

macro_rules! array_downcast_fn {
($name: ident, $arrty: ty, $arrty_str:expr) => {
#[doc = "Force downcast ArrayRef to "]
Expand Down
6 changes: 3 additions & 3 deletions arrow/src/array/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -273,9 +273,9 @@ pub use self::ord::{build_compare, DynComparator};
// --------------------- Array downcast helper functions ---------------------

pub use self::cast::{
as_boolean_array, as_dictionary_array, as_generic_list_array, as_large_list_array,
as_largestring_array, as_list_array, as_null_array, as_primitive_array,
as_string_array, as_struct_array,
as_boolean_array, as_dictionary_array, as_generic_binary_array,
as_generic_list_array, as_large_list_array, as_largestring_array, as_list_array,
as_null_array, as_primitive_array, as_string_array, as_struct_array,
};

// ------------------------------ C Data Interface ---------------------------
Expand Down
317 changes: 317 additions & 0 deletions arrow/src/compute/kernels/sort.rs
Original file line number Diff line number Diff line change
Expand Up @@ -377,6 +377,10 @@ pub fn sort_to_indices(
}
}
}
DataType::Binary | DataType::FixedSizeBinary(_) => {
sort_binary::<i32>(values, v, n, &options, limit)
}
DataType::LargeBinary => sort_binary::<i64>(values, v, n, &options, limit),
t => {
return Err(ArrowError::ComputeError(format!(
"Sort not supported for data type {:?}",
Expand Down Expand Up @@ -758,6 +762,67 @@ where
}
}

fn sort_binary<S>(
values: &ArrayRef,
value_indices: Vec<u32>,
mut null_indices: Vec<u32>,
options: &SortOptions,
limit: Option<usize>,
) -> UInt32Array
where
S: BinaryOffsetSizeTrait,
{
let mut valids: Vec<(u32, &[u8])> = values
.as_any()
.downcast_ref::<FixedSizeBinaryArray>()
.map_or_else(
|| {
let values = as_generic_binary_array::<S>(values);
value_indices
.iter()
.copied()
.map(|index| (index, values.value(index as usize)))
.collect()
},
|values| {
value_indices
.iter()
.copied()
.map(|index| (index, values.value(index as usize)))
.collect()
},
);

let mut len = values.len();
let descending = options.descending;
let nulls_len = null_indices.len();

if let Some(limit) = limit {
len = limit.min(len);
}
if !descending {
sort_unstable_by(&mut valids, len.saturating_sub(nulls_len), |a, b| {
a.1.cmp(b.1)
});
} else {
sort_unstable_by(&mut valids, len.saturating_sub(nulls_len), |a, b| {
a.1.cmp(b.1).reverse()
});
null_indices.reverse();
}

let mut valid_indices: Vec<u32> = valids.iter().map(|tuple| tuple.0).collect();
if options.nulls_first {
null_indices.append(&mut valid_indices);
null_indices.truncate(len);
UInt32Array::from(null_indices)
} else {
valid_indices.append(&mut null_indices);
valid_indices.truncate(len);
UInt32Array::from(valid_indices)
}
}

/// Compare two `Array`s based on the ordering defined in [ord](crate::array::ord).
fn cmp_array(a: &Array, b: &Array) -> Ordering {
let cmp_op = build_compare(a, b).unwrap();
Expand Down Expand Up @@ -1176,6 +1241,60 @@ mod tests {
}
}

fn test_sort_binary_arrays(
data: Vec<Option<Vec<u8>>>,
options: Option<SortOptions>,
limit: Option<usize>,
expected_data: Vec<Option<Vec<u8>>>,
fixed_length: Option<i32>,
) {
// Fixed size binary array
if fixed_length.is_some() {
let input = Arc::new(
FixedSizeBinaryArray::try_from_sparse_iter(data.iter().cloned()).unwrap(),
);
let sorted = match limit {
Some(_) => sort_limit(&(input as ArrayRef), options, limit).unwrap(),
None => sort(&(input as ArrayRef), options).unwrap(),
};
let expected = Arc::new(
FixedSizeBinaryArray::try_from_sparse_iter(expected_data.iter().cloned())
.unwrap(),
) as ArrayRef;

assert_eq!(&sorted, &expected);
}

// Generic size binary array
fn make_generic_binary_array<S: BinaryOffsetSizeTrait>(
data: &[Option<Vec<u8>>],
) -> Arc<GenericBinaryArray<S>> {
Arc::new(GenericBinaryArray::<S>::from_opt_vec(
data.iter()
.map(|binary| binary.as_ref().map(Vec::as_slice))
.collect(),
))
}

// BinaryArray
let input = make_generic_binary_array::<i32>(&data);
let sorted = match limit {
Some(_) => sort_limit(&(input as ArrayRef), options, limit).unwrap(),
None => sort(&(input as ArrayRef), options).unwrap(),
};
let expected = make_generic_binary_array::<i32>(&expected_data) as ArrayRef;
assert_eq!(&sorted, &expected);

// LargeBinaryArray
let input = make_generic_binary_array::<i64>(&data);
let sorted = match limit {
Some(_) => sort_limit(&(input as ArrayRef), options, limit).unwrap(),
None => sort(&(input as ArrayRef), options).unwrap(),
};
let expected = make_generic_binary_array::<i64>(&expected_data) as ArrayRef;
assert_eq!(&sorted, &expected);
}

#[test]
fn test_sort_to_indices_primitives() {
test_sort_to_indices_primitive_arrays::<Int8Type>(
Expand Down Expand Up @@ -2375,6 +2494,204 @@ mod tests {
);
}

#[test]
fn test_sort_binary() {
test_sort_binary_arrays(
vec![
Some(vec![0, 0, 0]),
Some(vec![0, 0, 5]),
Some(vec![0, 0, 3]),
Some(vec![0, 0, 7]),
Some(vec![0, 0, 1]),
],
Some(SortOptions {
descending: false,
nulls_first: false,
}),
None,
vec![
Some(vec![0, 0, 0]),
Some(vec![0, 0, 1]),
Some(vec![0, 0, 3]),
Some(vec![0, 0, 5]),
Some(vec![0, 0, 7]),
],
Some(3),
);

// with nulls
test_sort_binary_arrays(
vec![
Some(vec![0, 0, 0]),
None,
Some(vec![0, 0, 3]),
Some(vec![0, 0, 7]),
Some(vec![0, 0, 1]),
None,
],
Some(SortOptions {
descending: false,
nulls_first: false,
}),
None,
vec![
Some(vec![0, 0, 0]),
Some(vec![0, 0, 1]),
Some(vec![0, 0, 3]),
Some(vec![0, 0, 7]),
None,
None,
],
Some(3),
);

test_sort_binary_arrays(
vec![
Some(vec![3, 5, 7]),
None,
Some(vec![1, 7, 1]),
Some(vec![2, 7, 3]),
None,
Some(vec![1, 4, 3]),
],
Some(SortOptions {
descending: false,
nulls_first: false,
}),
None,
vec![
Some(vec![1, 4, 3]),
Some(vec![1, 7, 1]),
Some(vec![2, 7, 3]),
Some(vec![3, 5, 7]),
None,
None,
],
Some(3),
);

// descending
test_sort_binary_arrays(
vec![
Some(vec![0, 0, 0]),
None,
Some(vec![0, 0, 3]),
Some(vec![0, 0, 7]),
Some(vec![0, 0, 1]),
None,
],
Some(SortOptions {
descending: true,
nulls_first: false,
}),
None,
vec![
Some(vec![0, 0, 7]),
Some(vec![0, 0, 3]),
Some(vec![0, 0, 1]),
Some(vec![0, 0, 0]),
None,
None,
],
Some(3),
);

// nulls first
test_sort_binary_arrays(
vec![
Some(vec![0, 0, 0]),
None,
Some(vec![0, 0, 3]),
Some(vec![0, 0, 7]),
Some(vec![0, 0, 1]),
None,
],
Some(SortOptions {
descending: false,
nulls_first: true,
}),
None,
vec![
None,
None,
Some(vec![0, 0, 0]),
Some(vec![0, 0, 1]),
Some(vec![0, 0, 3]),
Some(vec![0, 0, 7]),
],
Some(3),
);

// limit
test_sort_binary_arrays(
vec![
Some(vec![0, 0, 0]),
None,
Some(vec![0, 0, 3]),
Some(vec![0, 0, 7]),
Some(vec![0, 0, 1]),
None,
],
Some(SortOptions {
descending: false,
nulls_first: true,
}),
Some(4),
vec![None, None, Some(vec![0, 0, 0]), Some(vec![0, 0, 1])],
Some(3),
);

// var length
test_sort_binary_arrays(
vec![
Some(b"Hello".to_vec()),
None,
Some(b"from".to_vec()),
Some(b"Apache".to_vec()),
Some(b"Arrow-rs".to_vec()),
None,
],
Some(SortOptions {
descending: false,
nulls_first: false,
}),
None,
vec![
Some(b"Apache".to_vec()),
Some(b"Arrow-rs".to_vec()),
Some(b"Hello".to_vec()),
Some(b"from".to_vec()),
None,
None,
],
None,
);

// limit
test_sort_binary_arrays(
vec![
Some(b"Hello".to_vec()),
None,
Some(b"from".to_vec()),
Some(b"Apache".to_vec()),
Some(b"Arrow-rs".to_vec()),
None,
],
Some(SortOptions {
descending: false,
nulls_first: true,
}),
Some(4),
vec![
None,
None,
Some(b"Apache".to_vec()),
Some(b"Arrow-rs".to_vec()),
],
None,
);
}

#[test]
fn test_lex_sort_single_column() {
let input = vec![SortColumn {
Expand Down
Loading

0 comments on commit 90a4a8d

Please sign in to comment.