diff --git a/datafusion-examples/examples/large_string_sort.rs b/datafusion-examples/examples/large_string_sort.rs new file mode 100644 index 000000000000..878f96904aec --- /dev/null +++ b/datafusion-examples/examples/large_string_sort.rs @@ -0,0 +1,91 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use arrow::array::{RecordBatch, StringBuilder}; +use arrow_schema::{DataType, Field, Schema}; +use datafusion::execution::disk_manager::DiskManagerConfig; +use datafusion::execution::memory_pool::FairSpillPool; +use datafusion::execution::runtime_env::RuntimeEnvBuilder; +use datafusion::execution::SessionStateBuilder; +use datafusion::parquet::arrow::ArrowWriter; +use datafusion::prelude::{ParquetReadOptions, SessionContext}; +use futures::TryStreamExt; +use std::sync::Arc; + +#[tokio::main(flavor = "multi_thread", worker_threads = 1)] +pub async fn main() { + build_parquet(); + + let env = RuntimeEnvBuilder::new() + .with_disk_manager(DiskManagerConfig::default()) + .with_memory_pool(Arc::new(FairSpillPool::new(100 * 1024 * 1024))) + .build_arc() + .unwrap(); + + let state = SessionStateBuilder::new().with_runtime_env(env).build(); + + let ctx = SessionContext::from(state); + + ctx.register_parquet( + "big_strings", + "/tmp/big_strings.parquet", + ParquetReadOptions::default(), + ) + .await + .unwrap(); + + let sql = "SELECT * FROM big_strings ORDER BY strings"; + println!("Sorting strings"); + ctx.sql(sql) + .await + .unwrap() + .execute_stream() + .await + .unwrap() + .try_for_each(|_| std::future::ready(Ok(()))) + .await + .unwrap(); +} + +fn build_parquet() { + if std::fs::File::open("/tmp/big_strings.parquet").is_ok() { + println!("Using existing file at /tmp/big_strings.parquet"); + return; + } + println!("Generating test file at /tmp/big_strings.parquet"); + let file = std::fs::File::create("/tmp/big_strings.parquet").unwrap(); + let schema = Arc::new(Schema::new(vec![Field::new( + "strings", + DataType::Utf8, + false, + )])); + let mut writer = ArrowWriter::try_new(file, schema.clone(), None).unwrap(); + + for batch_idx in 0..100 { + println!("Generating batch {} of 100", batch_idx); + let mut string_array_builder = + StringBuilder::with_capacity(1024 * 1024, 1024 * 1024 * 3 * 14); + for i in 0..(1024 * 1024) { + string_array_builder + .append_value(format!("string-{}string-{}string-{}", i, i, i)); + } + let array = Arc::new(string_array_builder.finish()); + let batch = RecordBatch::try_new(schema.clone(), vec![array]).unwrap(); + writer.write(&batch).unwrap(); + } + writer.close().unwrap(); +} diff --git a/datafusion/physical-plan/src/sorts/sort.rs b/datafusion/physical-plan/src/sorts/sort.rs index 33c8a2b2fee3..574737bbbaf8 100644 --- a/datafusion/physical-plan/src/sorts/sort.rs +++ b/datafusion/physical-plan/src/sorts/sort.rs @@ -402,6 +402,7 @@ impl ExternalSorter { /// /// Returns the amount of memory freed. async fn spill(&mut self) -> Result { + println!("Spilling"); // we could always get a chance to free some memory as long as we are holding some if self.in_mem_batches.is_empty() { return Ok(0); @@ -437,6 +438,13 @@ impl ExternalSorter { // allocation. self.merge_reservation.free(); + let size: usize = self + .in_mem_batches + .iter() + .map(get_record_batch_memory_size) + .sum(); + println!("Before sorting we have {} bytes of unsorted data", size); + self.in_mem_batches = self .in_mem_sort_stream(self.metrics.baseline.intermediate())? .try_collect() @@ -448,9 +456,12 @@ impl ExternalSorter { .map(get_record_batch_memory_size) .sum(); + println!("After sorting we now have {} bytes of sorted data", size); + // Reserve headroom for next sort/merge self.reserve_memory_for_merge()?; + // This is where the error occurs self.reservation.try_resize(size)?; self.in_mem_batches_sorted = true; Ok(())