Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

FIX: Out of bounds error when inserting into MemTable with zero partitions #14011

Merged
merged 1 commit into from
Jan 6, 2025
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 31 additions & 0 deletions datafusion/core/src/datasource/memory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -265,6 +265,10 @@ impl TableProvider for MemTable {
input: Arc<dyn ExecutionPlan>,
insert_op: InsertOp,
) -> Result<Arc<dyn ExecutionPlan>> {
if self.batches.is_empty() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since we allow scanning a MemTable with zero partitions, it seems appropriate to perform this check here.

return plan_err!("Cannot insert into MemTable with zero partitions.");
}

// If we are inserting into the table, any sort order may be messed up so reset it here
*self.sort_order.lock() = vec![];

Expand Down Expand Up @@ -333,7 +337,11 @@ impl DisplayAs for MemSink {
}

impl MemSink {
/// Creates a new [`MemSink`].
///
/// The caller is responsible for ensuring that there is at least one partition to insert into.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you could avoid the assert / panic here by changing the new method to try_new and return a Result so it could be an error if the batches is empty (and move the check to try_new)

However, I agree this PR is an improvement over what is currently on main so this PR is good to merge as is.

ALso I double checked and MemSink is not part of the Public API so this seems reasonable to me

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In hindsight this would be the better API, even though its just internal.
As I don't have access to a computer for now, I'd merge it like that and if the API ever becomes public we expose the error to the caller.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Merged. Thanks @tobixdev . I will create a new PR to follow @alamb 's suggestion.

fn new(batches: Vec<PartitionData>) -> Self {
assert!(!batches.is_empty());
Self { batches }
}
}
Expand Down Expand Up @@ -779,4 +787,27 @@ mod tests {
assert_eq!(resulting_data_in_table[0].len(), 2);
Ok(())
}

// Test inserting a batch into a MemTable without any partitions
#[tokio::test]
async fn test_insert_into_zero_partition() -> Result<()> {
// Create a new schema with one field called "a" of type Int32
let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Int32, false)]));

// Create a new batch of data to insert into the table
let batch = RecordBatch::try_new(
schema.clone(),
vec![Arc::new(Int32Array::from(vec![1, 2, 3]))],
)?;
// Run the experiment and expect an error
let experiment_result = experiment(schema, vec![], vec![vec![batch.clone()]])
.await
.unwrap_err();
// Ensure that there is a descriptive error message
assert_eq!(
"Error during planning: Cannot insert into MemTable with zero partitions.",
experiment_result.strip_backtrace()
);
Ok(())
}
}
Loading