Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve in-place primitive sorts by 13-67% #4473

Merged
merged 6 commits into from
Jul 4, 2023
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
211 changes: 207 additions & 4 deletions arrow-ord/src/sort.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,14 @@ use arrow_array::builder::BufferBuilder;
use arrow_array::cast::*;
use arrow_array::types::*;
use arrow_array::*;
use arrow_buffer::Buffer;
use arrow_buffer::{ArrowNativeType, MutableBuffer, NullBuffer};
use arrow_data::ArrayData;
use arrow_data::ArrayDataBuilder;
use arrow_schema::{ArrowError, DataType, IntervalUnit, TimeUnit};
use arrow_select::take::take;
use core::slice;
use std::cmp::min;
use std::cmp::Ordering;
use std::sync::Arc;

Expand Down Expand Up @@ -57,11 +60,211 @@ pub fn sort(
values: &dyn Array,
options: Option<SortOptions>,
) -> Result<ArrayRef, ArrowError> {
if let DataType::RunEndEncoded(_, _) = values.data_type() {
return sort_run(values, options, None);
match values.data_type() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you changed sort_native_type to take PrimitiveArray instead of dyn Array you could use the downcast_primitive_array macro here

DataType::Int8 => sort_native_type::<Int8Type, i8>(values, options),
DataType::Int16 => sort_native_type::<Int16Type, i16>(values, options),
DataType::Int32 => sort_native_type::<Int32Type, i32>(values, options),
DataType::Int64 => sort_native_type::<Int64Type, i64>(values, options),
DataType::UInt8 => sort_native_type::<UInt8Type, u8>(values, options),
DataType::UInt16 => sort_native_type::<UInt16Type, u16>(values, options),
DataType::UInt32 => sort_native_type::<UInt32Type, u32>(values, options),
DataType::UInt64 => sort_native_type::<UInt64Type, u64>(values, options),
DataType::Float32 => sort_native_type::<Float32Type, f32>(values, options),
DataType::Float64 => sort_native_type::<Float64Type, f64>(values, options),
DataType::Date32 => sort_native_type::<Date32Type, i32>(values, options),
DataType::Date64 => sort_native_type::<Date64Type, i64>(values, options),
DataType::Time32(TimeUnit::Second) => {
sort_native_type::<Time32SecondType, i32>(values, options)
}
DataType::Time32(TimeUnit::Millisecond) => {
sort_native_type::<Time32MillisecondType, i32>(values, options)
}
DataType::Time64(TimeUnit::Microsecond) => {
sort_native_type::<Time64MicrosecondType, i64>(values, options)
}
DataType::Time64(TimeUnit::Nanosecond) => {
sort_native_type::<Time64NanosecondType, i64>(values, options)
}
DataType::Timestamp(TimeUnit::Second, _) => {
sort_native_type::<TimestampSecondType, i64>(values, options)
}
DataType::Timestamp(TimeUnit::Millisecond, _) => {
sort_native_type::<TimestampMillisecondType, i64>(values, options)
}
DataType::Timestamp(TimeUnit::Microsecond, _) => {
sort_native_type::<TimestampMicrosecondType, i64>(values, options)
}
DataType::Timestamp(TimeUnit::Nanosecond, _) => {
sort_native_type::<TimestampNanosecondType, i64>(values, options)
}
DataType::Interval(IntervalUnit::YearMonth) => {
sort_native_type::<IntervalYearMonthType, i32>(values, options)
}
DataType::Interval(IntervalUnit::DayTime) => {
sort_native_type::<IntervalDayTimeType, i64>(values, options)
}
DataType::Interval(IntervalUnit::MonthDayNano) => {
sort_native_type::<IntervalMonthDayNanoType, i128>(values, options)
}
DataType::Duration(TimeUnit::Second) => {
sort_native_type::<DurationSecondType, i64>(values, options)
}
DataType::Duration(TimeUnit::Millisecond) => {
sort_native_type::<DurationMillisecondType, i64>(values, options)
}
DataType::Duration(TimeUnit::Microsecond) => {
sort_native_type::<DurationMicrosecondType, i64>(values, options)
}
DataType::Duration(TimeUnit::Nanosecond) => {
sort_native_type::<DurationNanosecondType, i64>(values, options)
}
DataType::RunEndEncoded(_, _) => sort_run(values, options, None),
_ => {
let indices = sort_to_indices(values, options, None)?;
take(values, &indices, None)
}
}
let indices = sort_to_indices(values, options, None)?;
take(values, &indices, None)
}

fn compress_store<U>(input: *const U, mut output: *mut U, mask: u8) -> isize
psvri marked this conversation as resolved.
Show resolved Hide resolved
where
U: ArrowNativeType,
{
let mut offset = 0;
if mask != 0 {
for i in 0..8 {
if (mask & (1 << i)) != 0 {
// This is safe since a valid bit i.e bit set to 1 indicates a valid value
unsafe {
*output = *input.offset(i);
offset += 1;
output = output.offset(1);
}
}
}
}
offset
}

fn create_null_buffer(
valid_count: usize,
nulls_count: usize,
length: usize,
sort_options: SortOptions,
) -> Option<Buffer> {
let null_capacity = (length / 8) + (length % 8 != 0) as usize;
psvri marked this conversation as resolved.
Show resolved Hide resolved
let mut mutable_null_buffer = MutableBuffer::new(null_capacity * 8);
mutable_null_buffer.resize(null_capacity, 0);

let mutable_null_buffer_slice = mutable_null_buffer.as_slice_mut();

if valid_count > 0 {
let mut count = valid_count;
let mut index = 0;
if sort_options.nulls_first {
let remaining_nulls = nulls_count % 8;
index = nulls_count / 8;

if remaining_nulls != 0 {
let valid_values_count = min(8 - remaining_nulls, valid_count);
mutable_null_buffer_slice[index] =
((1 << valid_values_count) - 1) << remaining_nulls;
count -= valid_values_count;
index += 1;
}
}
while count >= 8 {
mutable_null_buffer_slice[index] = u8::MAX;
index += 1;
count -= 8;
}
if count != 0 {
mutable_null_buffer_slice[index] = (1 << count) - 1;
}
}

Some(mutable_null_buffer.into())
}

fn sort_native_type<T, U>(
values: &dyn Array,
options: Option<SortOptions>,
) -> Result<ArrayRef, ArrowError>
where
T: ArrowPrimitiveType,
U: ArrowNativeTypeOp,
psvri marked this conversation as resolved.
Show resolved Hide resolved
{
let sort_options = options.unwrap_or_default();
let values = values.as_primitive::<T>();

let result_capacity = values.len() * std::mem::size_of::<U>();
let mut mutable_buffer = MutableBuffer::new(result_capacity);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Have you considered just using Vec here?

mutable_buffer.resize(result_capacity, 0);
let mutable_slice: &mut [U] = mutable_buffer.typed_data_mut();

let array_data = values.to_data();
let input_values: &[U] = array_data.buffer(0);

let mut null_bit_buffer = None;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It might be nicer to use an expression style here, rather than using mut

e.g.

let nulls = match array.nulls().filter(|n| n.null_count() > 0) {
Some(nulls) => ...,
None => ...
}


let nulls_count = values.null_count();
let valid_count = values.len() - nulls_count;

if values.null_count() > 0 {
let nulls = array_data.nulls().unwrap();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
if values.null_count() > 0 {
let nulls = array_data.nulls().unwrap();
if let Some(nulls) = array.nulls().filter(|n| n.null_count() > 0) {

let null_buffer = nulls.buffer().as_slice();

let mut mutable_slice_ptr = mutable_slice.as_mut_ptr();
let mut input_values_ptr = input_values.as_ptr();

if sort_options.nulls_first {
// This is safe since the offset in in bounds
unsafe {
mutable_slice_ptr = mutable_slice_ptr.add(values.null_count());
}
}

// This is safe since we are in bounds
let values_slice =
unsafe { slice::from_raw_parts_mut(mutable_slice_ptr, valid_count) };

for mask in null_buffer {
psvri marked this conversation as resolved.
Show resolved Hide resolved
let written_count =
compress_store::<U>(input_values_ptr, mutable_slice_ptr, *mask);
// This is safe as the offset increments are within bounds
unsafe {
input_values_ptr = input_values_ptr.offset(8);
mutable_slice_ptr = mutable_slice_ptr.offset(written_count);
}
}

values_slice.sort_unstable_by(|a, b| a.compare(*b));
if sort_options.descending {
values_slice.reverse();
}

null_bit_buffer =
create_null_buffer(valid_count, nulls_count, values.len(), sort_options);
} else {
mutable_slice.copy_from_slice(input_values);
mutable_slice.sort_unstable_by(|a, b| a.compare(*b));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
mutable_slice.sort_unstable_by(|a, b| a.compare(*b));
mutable_slice.sort_unstable();

Should be the same?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not for floats, we use total ordering not the default partial ordering

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah forgot about that :)

if sort_options.descending {
mutable_slice.reverse();
}
Comment on lines +119 to +122
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks like it should be faster for the descending case?

Suggested change
mutable_slice.sort_unstable_by(|a, b| a.compare(*b));
if sort_options.descending {
mutable_slice.reverse();
}
if sort_options.descending {
mutable_slice.sort_unstable_by(|a, b| b.compare(*a));
} else {
mutable_slice.sort_unstable_by(|a, b| a.compare(*b));
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Tested it just now on my laptop. The difference is only b/n 2-3% .

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for checking. I would argue it's also a bit more simple :)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think given the marginal speed difference it makes sense to save on codegen by using reverse 👍

}
// This is safe since data types match
psvri marked this conversation as resolved.
Show resolved Hide resolved
let result_array = unsafe {
ArrayData::new_unchecked(
values.data_type().clone(),
values.len(),
Some(nulls_count),
null_bit_buffer,
0,
vec![mutable_buffer.into()],
vec![],
)
};
Ok(Arc::new(PrimitiveArray::<T>::from(result_array)))
}

/// Sort the `ArrayRef` partially.
Expand Down
5 changes: 5 additions & 0 deletions arrow/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,11 @@ name = "sort_kernel"
harness = false
required-features = ["test_utils"]

[[bench]]
name = "sort_kernel_primitives"
harness = false
required-features = ["test_utils"]

[[bench]]
name = "partition_kernels"
harness = false
Expand Down
59 changes: 59 additions & 0 deletions arrow/benches/sort_kernel_primitives.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

#[macro_use]
extern crate criterion;
use arrow_ord::sort::sort;
use criterion::Criterion;

use std::sync::Arc;

extern crate arrow;

use arrow::util::bench_util::*;
use arrow::{array::*, datatypes::Int64Type};

fn create_i64_array(size: usize, with_nulls: bool) -> ArrayRef {
let null_density = if with_nulls { 0.5 } else { 0.0 };
let array = create_primitive_array::<Int64Type>(size, null_density);
Arc::new(array)
}

fn bench_sort(array: &ArrayRef) {
criterion::black_box(sort(criterion::black_box(array), None).unwrap());
}

fn add_benchmark(c: &mut Criterion) {
let arr_a = create_i64_array(2u64.pow(10) as usize, false);

c.bench_function("sort 2^10", |b| b.iter(|| bench_sort(&arr_a)));

let arr_a = create_i64_array(2u64.pow(12) as usize, false);

c.bench_function("sort 2^12", |b| b.iter(|| bench_sort(&arr_a)));

let arr_a = create_i64_array(2u64.pow(10) as usize, true);

c.bench_function("sort nulls 2^10", |b| b.iter(|| bench_sort(&arr_a)));

let arr_a = create_i64_array(2u64.pow(12) as usize, true);

c.bench_function("sort nulls 2^12", |b| b.iter(|| bench_sort(&arr_a)));
}

criterion_group!(benches, add_benchmark);
criterion_main!(benches);