Skip to content

Commit

Permalink
remove usage of parking_lot, wip spilling
Browse files Browse the repository at this point in the history
yjshen committed Jan 4, 2022
1 parent e44471d commit 6679628
Showing 6 changed files with 219 additions and 93 deletions.
1 change: 0 additions & 1 deletion datafusion/Cargo.toml
Original file line number Diff line number Diff line change
@@ -77,7 +77,6 @@ rand = "0.8"
avro-rs = { version = "0.13", features = ["snappy"], optional = true }
num-traits = { version = "0.2", optional = true }
pyo3 = { version = "0.14", optional = true }
parking_lot = "0.11.2"
uuid = { version = "0.8", features = ["v4"] }
tempfile = "3"

15 changes: 7 additions & 8 deletions datafusion/src/execution/memory_management/memory_pool.rs
Original file line number Diff line number Diff line change
@@ -19,9 +19,8 @@ use crate::execution::memory_management::{MemoryConsumer, MemoryConsumerId};
use crate::physical_plan::aggregates::return_type;
use hashbrown::HashMap;
use log::{info, warn};
use parking_lot::{Condvar, Mutex};
use std::cmp::{max, min};
use std::sync::Arc;
use std::sync::{Arc, Condvar, Mutex};

pub(crate) trait ExecutionMemoryPool {
fn memory_available(&self) -> usize;
@@ -105,19 +104,19 @@ impl ExecutionMemoryPool for ConstraintExecutionMemoryPool {
}

fn memory_used(&self) -> usize {
let a = self.memory_usage.lock();
let a = self.memory_usage.lock().unwrap();
a.values().sum()
}

fn memory_used_partition(&self, partition_id: usize) -> usize {
let partition_usage = self.memory_usage.lock();
let partition_usage = self.memory_usage.lock().unwrap();
partition_usage[partition_id].unwrap_or(0)
}

fn acquire_memory(&self, required: usize, consumer: &dyn MemoryConsumer) -> usize {
assert!(required > 0);
let partition_id = consumer.partition_id();
let mut partition_usage = self.memory_usage.lock();
let mut partition_usage = self.memory_usage.lock().unwrap();
if !partition_usage.contains_key(&partition_id) {
partition_usage.entry(partition_id).or_insert(0);
self.condvar.notify_all();
@@ -168,7 +167,7 @@ impl ExecutionMemoryPool for ConstraintExecutionMemoryPool {
if granted_size == real_size {
return;
} else {
let mut partition_usage = self.memory_usage.lock();
let mut partition_usage = self.memory_usage.lock().unwrap();
if granted_size > real_size {
partition_usage.entry(consumer.partition_id()) -=
granted_size - real_size;
@@ -182,7 +181,7 @@ impl ExecutionMemoryPool for ConstraintExecutionMemoryPool {
}

fn release_memory(&self, release_size: usize, partition_id: usize) {
let mut partition_usage = self.memory_usage.lock();
let mut partition_usage = self.memory_usage.lock().unwrap();
let current_mem = partition_usage[partition_id].unwrap_or(0);
let to_free = if current_mem < release_size {
warn!(
@@ -203,7 +202,7 @@ impl ExecutionMemoryPool for ConstraintExecutionMemoryPool {
}

fn release_all(&self, partition_id: usize) -> usize {
let mut partition_usage = self.memory_usage.lock();
let mut partition_usage = self.memory_usage.lock().unwrap();
let current_mem = partition_usage[partition_id].unwrap_or(0);
if current_mem == 0 {
return 0;
9 changes: 4 additions & 5 deletions datafusion/src/execution/memory_management/mod.rs
Original file line number Diff line number Diff line change
@@ -26,11 +26,10 @@ use crate::execution::memory_management::memory_pool::{
use async_trait::async_trait;
use hashbrown::{HashMap, HashSet};
use log::{debug, info};
use parking_lot::Mutex;
use std::fmt;
use std::fmt::{Display, Formatter};
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
use std::sync::{Arc, Mutex};

static mut CONSUMER_ID: AtomicUsize = AtomicUsize::new(0);

@@ -60,7 +59,7 @@ impl MemoryManager {
) -> Result<usize> {
let partition_id = consumer.partition_id();
let partition_manager = {
let mut all_managers = self.partition_memory_manager.lock();
let mut all_managers = self.partition_memory_manager.lock().unwrap();
all_managers.entry(partition_id).or_insert_with(|| {
PartitionMemoryManager::new(partition_id, self.clone())
})
@@ -128,7 +127,7 @@ impl PartitionMemoryManager {
required: usize,
consumer: &dyn MemoryConsumer,
) -> Result<usize> {
let mut consumers = self.consumers.lock();
let mut consumers = self.consumers.lock().unwrap();
let mut got = self
.memory_manager
.acquire_exec_pool_memory(required, consumer);
@@ -154,7 +153,7 @@ impl PartitionMemoryManager {

pub fn show_memory_usage(&self) {
info!("Memory usage for partition {}", self.partition_id);
let mut consumers = self.consumers.lock();
let mut consumers = self.consumers.lock().unwrap();
let mut used = 0;
for c in consumers.iter() {
let cur_used = c.get_used();
168 changes: 102 additions & 66 deletions datafusion/src/physical_plan/sorts/external_sort.rs
Original file line number Diff line number Diff line change
@@ -20,7 +20,6 @@
use super::metrics::{
BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet, RecordOutput,
};
use super::{RecordBatchStream, SendableRecordBatchStream, Statistics};
use crate::error::{DataFusionError, Result};
use crate::execution::disk_manager::{DiskManager, PathFile};
use crate::execution::memory_management::{
@@ -40,6 +39,7 @@ use crate::physical_plan::sort_preserving_merge::SortPreservingMergeStream;
use crate::physical_plan::sorts::in_mem_sort::InMemSortStream;
use crate::physical_plan::sorts::sort::sort_batch;
use crate::physical_plan::sorts::sort_preserving_merge::SortPreservingMergeStream;
use crate::physical_plan::stream::RecordBatchReceiverStream;
use crate::physical_plan::{
common, DisplayFormatType, Distribution, ExecutionPlan, Partitioning,
RecordBatchStream, SendableRecordBatchStream, Statistics,
@@ -48,22 +48,28 @@ use arrow::compute::aggregate::estimated_bytes_size;
use arrow::compute::{sort::lexsort_to_indices, take};
use arrow::datatypes::SchemaRef;
use arrow::error::Result as ArrowResult;
use arrow::io::ipc::read::{read_file_metadata, FileReader};
use arrow::record_batch::RecordBatch;
use arrow::{array::ArrayRef, error::ArrowError};
use async_trait::async_trait;
use futures::channel::mpsc;
use futures::{Future, SinkExt, Stream, StreamExt};
use log::{debug, info};
use parking_lot::Mutex;
use log::{debug, error, info};
use pin_project_lite::pin_project;
use std::any::Any;
use std::fs::File;
use std::pin::Pin;
use std::sync::atomic::{AtomicIsize, AtomicUsize, Ordering};
use std::sync::Arc;
use std::sync::{Arc, Mutex};
use std::task::{Context, Poll};
use tokio::sync::mpsc::{Receiver, Sender};
use tokio::sync::mpsc::{Receiver as TKReceiver, Sender as TKSender};
use tokio::task;

enum ExternalSortingState {
Insert,
OutputWithMem,
OutputAllDisk,
}

struct ExternalSorter {
id: MemoryConsumerId,
schema: SchemaRef,
@@ -74,6 +80,7 @@ struct ExternalSorter {
spilled_count: AtomicUsize,
/// Sort expressions
expr: Vec<PhysicalSortExpr>,
exec_state: ExternalSortingState,
runtime: RuntimeEnv,
}

@@ -93,46 +100,26 @@ impl ExternalSorter {
spilled_bytes: AtomicUsize::new(0),
spilled_count: AtomicUsize::new(0),
expr,
exec_state: ExternalSortingState::Insert,
runtime,
}
}

fn insert_batch(
&mut self,
input: RecordBatch,
schema: SchemaRef,
expr: &[PhysicalSortExpr],
) -> Result<()> {
let size = batch_memory_size(&input);
self.allocate(size)?;
// sort each batch as it's inserted, more probably to be cache-resident
let sorted_batch = sort_batch(input, schema, expr)?;
let mut in_mem_batches = self.in_mem_batches.lock();
in_mem_batches.push(sorted_batch);
}

fn sort(&self) {}
}

impl MemoryConsumer for ExternalSorter {
fn name(&self) -> String {
"ExternalSorter".to_owned()
}

fn id(&self) -> &MemoryConsumerId {
&self.id
fn output_with_mem(&mut self) {
assert_eq!(self.exec_state, ExternalSortingState::Insert);
self.exec_state = ExternalSortingState::OutputWithMem
}

fn memory_manager(&self) -> Arc<MemoryManager> {
self.runtime.memory_manager.clone()
fn spill_during_output(&mut self) {
assert_eq!(self.exec_state, ExternalSortingState::OutputWithMem);
self.exec_state = ExternalSortingState::OutputAllDisk
}

async fn spill(&self, _size: usize, _trigger: &dyn MemoryConsumer) -> Result<usize> {
let in_mem_batches = self.in_mem_batches.lock();

async fn spill_while_inserting(&self) -> usize {
let mut in_mem_batches = self.in_mem_batches.lock().unwrap();
// we could always get a chance to free some memory as long as we are holding some
if in_mem_batches.len() == 0 {
return Ok(0);
return 0;
}

info!(
@@ -156,13 +143,50 @@ impl MemoryConsumer for ExternalSorter {
.await;

spill(stream, path, self.schema.clone())?;
*in_mem_batches = vec![];

{
let mut spills = self.spills.lock();
let mut spills = self.spills.lock().unwrap();
self.spilled_count.fetch_add(1, Ordering::SeqCst);
self.spilled_bytes.fetch_add(total_size, Ordering::SeqCst);
spills.push(path);
}
total_size
}

fn insert_batch(
&mut self,
input: RecordBatch,
schema: SchemaRef,
expr: &[PhysicalSortExpr],
) -> Result<()> {
let size = batch_memory_size(&input);
self.allocate(size)?;
// sort each batch as it's inserted, more probably to be cache-resident
let sorted_batch = sort_batch(input, schema, expr)?;
let mut in_mem_batches = self.in_mem_batches.lock().unwrap();
in_mem_batches.push(sorted_batch);
}

fn sort(&self) {}
}

impl MemoryConsumer for ExternalSorter {
fn name(&self) -> String {
"ExternalSorter".to_owned()
}

fn id(&self) -> &MemoryConsumerId {
&self.id
}

fn memory_manager(&self) -> Arc<MemoryManager> {
self.runtime.memory_manager.clone()
}

async fn spill(&self, _size: usize, _trigger: &dyn MemoryConsumer) -> Result<usize> {
let total_size = self.spill_while_inserting().await;

Ok(total_size)
}

@@ -211,17 +235,39 @@ async fn spill(
path: String,
schema: SchemaRef,
) -> Result<()> {
let (mut sender, receiver): (Sender<RecordBatch>, Receiver<RecordBatch>) =
let (mut sender, receiver): (TKSender<RecordBatch>, TKReceiver<RecordBatch>) =
tokio::sync::mpsc::channel(2);
while let Some(item) = sorted_stream.next().await {
sender.send(item).await.ok();
}
task::spawn_blocking(move || write_sorted(receiver, path, schema));
let path_clone = path.clone();
task::spawn_blocking(move || {
if let Err(e) = write_sorted(receiver, path_clone, schema) {
error!("Failure while spilling to path {}. Error: {}", path, e);
}
});
Ok(())
}

async fn read_spill_as_stream(
path: String,
schema: SchemaRef,
) -> Result<SendableRecordBatchStream> {
let (mut sender, receiver): (
TKSender<ArrowResult<RecordBatch>>,
TKReceiver<ArrowResult<RecordBatch>>,
) = tokio::sync::mpsc::channel(2);
let path_clone = path.clone();
task::spawn_blocking(move || {
if let Err(e) = read_spill(sender, path_clone) {
error!("Failure while reading spill file: {}. Error: {}", path, e);
}
});
Ok(RecordBatchReceiverStream::create(&schema, receiver))
}

fn write_sorted(
mut receiver: Receiver<RecordBatch>,
mut receiver: TKReceiver<RecordBatch>,
path: String,
schema: SchemaRef,
) -> Result<()> {
@@ -237,25 +283,17 @@ fn write_sorted(
Ok(())
}

struct SpillableSortedStream {
id: MemoryConsumerId,
schema: SchemaRef,
in_mem_batches: Mutex<Vec<RecordBatch>>,
/// Sort expressions
expr: Vec<PhysicalSortExpr>,
runtime: RuntimeEnv,
}

impl SpillableSortedStream {
fn new() -> Self {
Self {}
fn read_spill(
mut sender: TKSender<ArrowResult<RecordBatch>>,
path: String,
) -> Result<()> {
let mut file = File::open(&path).map_err(|e| e.into())?;
let file_meta = read_file_metadata(&mut file).map_err(|e| from_arrow_err(&e))?;
let reader = FileReader::new(&mut file, file_meta, None);
for batch in reader {
sender.blocking_send(batch)?;
}

fn memory_used(&self) -> usize {}

fn get_sorted_stream(&self) {}

fn spill_remaining(&self) {}
Ok(())
}

/// Sort execution plan
@@ -402,14 +440,9 @@ impl ExecutionPlan for ExternalSortExec {
}
}

pin_project! {
/// stream for sort plan
struct ExternalSortStream {
#[pin]
output: futures::channel::oneshot::Receiver<ArrowResult<Option<RecordBatch>>>,
finished: bool,
schema: SchemaRef,
}
/// stream for sort plan
struct ExternalSortStream {
schema: SchemaRef,
}

impl ExternalSortStream {
@@ -492,6 +525,7 @@ mod tests {
use crate::physical_plan::coalesce_partitions::CoalescePartitionsExec;
use crate::physical_plan::expressions::col;
use crate::physical_plan::memory::MemoryExec;
use crate::physical_plan::sorts::SortOptions;
use crate::physical_plan::{
collect,
csv::{CsvExec, CsvReadOptions},
@@ -661,3 +695,5 @@ mod tests {
Ok(())
}
}

impl ExternalSorter {}
Loading

0 comments on commit 6679628

Please sign in to comment.