Skip to content

Commit

Permalink
Add example showing sort error
Browse files Browse the repository at this point in the history
  • Loading branch information
westonpace committed Jan 6, 2025
1 parent a6e17f1 commit 26ed75c
Show file tree
Hide file tree
Showing 2 changed files with 102 additions and 0 deletions.
91 changes: 91 additions & 0 deletions datafusion-examples/examples/large_string_sort.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

use arrow::array::{RecordBatch, StringBuilder};
use arrow_schema::{DataType, Field, Schema};
use datafusion::execution::disk_manager::DiskManagerConfig;
use datafusion::execution::memory_pool::FairSpillPool;
use datafusion::execution::runtime_env::RuntimeEnvBuilder;
use datafusion::execution::SessionStateBuilder;
use datafusion::parquet::arrow::ArrowWriter;
use datafusion::prelude::{ParquetReadOptions, SessionContext};
use futures::TryStreamExt;
use std::sync::Arc;

#[tokio::main(flavor = "multi_thread", worker_threads = 1)]
pub async fn main() {
build_parquet();

let env = RuntimeEnvBuilder::new()
.with_disk_manager(DiskManagerConfig::default())
.with_memory_pool(Arc::new(FairSpillPool::new(100 * 1024 * 1024)))
.build_arc()
.unwrap();

let state = SessionStateBuilder::new().with_runtime_env(env).build();

let ctx = SessionContext::from(state);

ctx.register_parquet(
"big_strings",
"/tmp/big_strings.parquet",
ParquetReadOptions::default(),
)
.await
.unwrap();

let sql = "SELECT * FROM big_strings ORDER BY strings";
println!("Sorting strings");
ctx.sql(sql)
.await
.unwrap()
.execute_stream()
.await
.unwrap()
.try_for_each(|_| std::future::ready(Ok(())))
.await
.unwrap();
}

fn build_parquet() {
if std::fs::File::open("/tmp/big_strings.parquet").is_ok() {
println!("Using existing file at /tmp/big_strings.parquet");
return;
}
println!("Generating test file at /tmp/big_strings.parquet");
let file = std::fs::File::create("/tmp/big_strings.parquet").unwrap();
let schema = Arc::new(Schema::new(vec![Field::new(
"strings",
DataType::Utf8,
false,
)]));
let mut writer = ArrowWriter::try_new(file, schema.clone(), None).unwrap();

for batch_idx in 0..100 {
println!("Generating batch {} of 100", batch_idx);
let mut string_array_builder =
StringBuilder::with_capacity(1024 * 1024, 1024 * 1024 * 3 * 14);
for i in 0..(1024 * 1024) {
string_array_builder
.append_value(format!("string-{}string-{}string-{}", i, i, i));
}
let array = Arc::new(string_array_builder.finish());
let batch = RecordBatch::try_new(schema.clone(), vec![array]).unwrap();
writer.write(&batch).unwrap();
}
writer.close().unwrap();
}
11 changes: 11 additions & 0 deletions datafusion/physical-plan/src/sorts/sort.rs
Original file line number Diff line number Diff line change
Expand Up @@ -402,6 +402,7 @@ impl ExternalSorter {
///
/// Returns the amount of memory freed.
async fn spill(&mut self) -> Result<usize> {
println!("Spilling");
// we could always get a chance to free some memory as long as we are holding some
if self.in_mem_batches.is_empty() {
return Ok(0);
Expand Down Expand Up @@ -437,6 +438,13 @@ impl ExternalSorter {
// allocation.
self.merge_reservation.free();

let size: usize = self
.in_mem_batches
.iter()
.map(get_record_batch_memory_size)
.sum();
println!("Before sorting we have {} bytes of unsorted data", size);

self.in_mem_batches = self
.in_mem_sort_stream(self.metrics.baseline.intermediate())?
.try_collect()
Expand All @@ -448,9 +456,12 @@ impl ExternalSorter {
.map(get_record_batch_memory_size)
.sum();

println!("After sorting we now have {} bytes of sorted data", size);

// Reserve headroom for next sort/merge
self.reserve_memory_for_merge()?;

// This is where the error occurs
self.reservation.try_resize(size)?;
self.in_mem_batches_sorted = true;
Ok(())
Expand Down

0 comments on commit 26ed75c

Please sign in to comment.