diff --git a/datafusion/core/src/datasource/memory.rs b/datafusion/core/src/datasource/memory.rs index c1e0bea0b3ff..e8cf216ee9db 100644 --- a/datafusion/core/src/datasource/memory.rs +++ b/datafusion/core/src/datasource/memory.rs @@ -265,6 +265,10 @@ impl TableProvider for MemTable { input: Arc, insert_op: InsertOp, ) -> Result> { + if self.batches.is_empty() { + return plan_err!("Cannot insert into MemTable with zero partitions."); + } + // If we are inserting into the table, any sort order may be messed up so reset it here *self.sort_order.lock() = vec![]; @@ -333,7 +337,11 @@ impl DisplayAs for MemSink { } impl MemSink { + /// Creates a new [`MemSink`]. + /// + /// The caller is responsible for ensuring that there is at least one partition to insert into. fn new(batches: Vec) -> Self { + assert!(!batches.is_empty()); Self { batches } } } @@ -779,4 +787,27 @@ mod tests { assert_eq!(resulting_data_in_table[0].len(), 2); Ok(()) } + + // Test inserting a batch into a MemTable without any partitions + #[tokio::test] + async fn test_insert_into_zero_partition() -> Result<()> { + // Create a new schema with one field called "a" of type Int32 + let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Int32, false)])); + + // Create a new batch of data to insert into the table + let batch = RecordBatch::try_new( + schema.clone(), + vec![Arc::new(Int32Array::from(vec![1, 2, 3]))], + )?; + // Run the experiment and expect an error + let experiment_result = experiment(schema, vec![], vec![vec![batch.clone()]]) + .await + .unwrap_err(); + // Ensure that there is a descriptive error message + assert_eq!( + "Error during planning: Cannot insert into MemTable with zero partitions.", + experiment_result.strip_backtrace() + ); + Ok(()) + } }