From c1f3b3fd43a34382738afeb49fdceeffde4ef189 Mon Sep 17 00:00:00 2001 From: Lei Xu Date: Sat, 28 Jan 2023 14:43:22 -0800 Subject: [PATCH 1/3] rename create to write --- rust/pylance/src/dataset.rs | 2 +- rust/src/dataset.rs | 6 +++--- rust/src/io/exec/knn.rs | 2 +- rust/src/io/exec/limit.rs | 2 +- 4 files changed, 6 insertions(+), 6 deletions(-) diff --git a/rust/pylance/src/dataset.rs b/rust/pylance/src/dataset.rs index 07cd228165..4884ea026c 100644 --- a/rust/pylance/src/dataset.rs +++ b/rust/pylance/src/dataset.rs @@ -128,7 +128,7 @@ pub fn write_dataset(reader: &PyAny, uri: &str, options: &PyDict) -> PyResult, uri: &str, params: Option, @@ -505,7 +505,7 @@ mod tests { write_params.max_rows_per_file = 40; write_params.max_rows_per_group = 10; let mut reader: Box = Box::new(batches); - Dataset::create(&mut reader, test_uri, Some(write_params)) + Dataset::write(&mut reader, test_uri, Some(write_params)) .await .unwrap(); @@ -569,7 +569,7 @@ mod tests { let test_uri = test_dir.path().to_str().unwrap(); let mut reader: Box = Box::new(batches); - let dataset = Dataset::create(&mut reader, test_uri, None).await.unwrap(); + let dataset = Dataset::write(&mut reader, test_uri, None).await.unwrap(); let params = VectorIndexParams::default(); dataset diff --git a/rust/src/io/exec/knn.rs b/rust/src/io/exec/knn.rs index 6094b40246..72ba4381b6 100644 --- a/rust/src/io/exec/knn.rs +++ b/rust/src/io/exec/knn.rs @@ -212,7 +212,7 @@ mod tests { let q = as_fixed_size_list_array(&vector_arr).value(5); let mut reader: Box = Box::new(batches); - Dataset::create(&mut reader, test_uri, Some(write_params)) + Dataset::write(&mut reader, test_uri, Some(write_params)) .await .unwrap(); diff --git a/rust/src/io/exec/limit.rs b/rust/src/io/exec/limit.rs index 36b01c2f17..4ce847bc08 100644 --- a/rust/src/io/exec/limit.rs +++ b/rust/src/io/exec/limit.rs @@ -175,7 +175,7 @@ mod tests { let mut params = WriteParams::default(); params.max_rows_per_group = 10; let mut reader: Box = Box::new(batches); - Dataset::create(&mut reader, path, Some(params)) + Dataset::write(&mut reader, path, Some(params)) .await .unwrap(); expected_batches From f2bac1941d2150f8878a32d14dfc30505744d749 Mon Sep 17 00:00:00 2001 From: Lei Xu Date: Sat, 28 Jan 2023 15:48:23 -0800 Subject: [PATCH 2/3] add test --- rust/src/dataset.rs | 116 ++++++++++++++++++++++++++++++++---- rust/src/dataset/write.rs | 2 + rust/src/io/object_store.rs | 9 +++ 3 files changed, 116 insertions(+), 11 deletions(-) diff --git a/rust/src/dataset.rs b/rust/src/dataset.rs index 512a30b1ce..ae4c1e731a 100644 --- a/rust/src/dataset.rs +++ b/rust/src/dataset.rs @@ -118,7 +118,7 @@ impl Dataset { }) } - /// Create a new dataset with a stream of [RecordBatch]s. + /// Write to or Create a [Dataset] with a stream of [RecordBatch]s. /// /// Returns the newly created [`Dataset`]. Returns [Error] if the dataset already exists. pub async fn write( @@ -126,16 +126,18 @@ impl Dataset { uri: &str, params: Option, ) -> Result { - // 1. check the directory does not exist. let object_store = Arc::new(ObjectStore::new(uri)?); + let params = params.unwrap_or_default(); let latest_manifest_path = latest_manifest_path(object_store.base_path()); - match object_store.inner.head(&latest_manifest_path).await { - Ok(_) => return Err(Error::IO(format!("Dataset already exists: {uri}"))), - Err(object_store::Error::NotFound { path: _, source: _ }) => { /* we are good */ } - Err(e) => return Err(Error::from(e)), - } - let params = params.unwrap_or_default(); + let latest_manifest = if matches!(params.mode, WriteMode::Create) { + if object_store.exists(&latest_manifest_path).await? { + return Err(Error::IO(format!("Dataset already exists: {uri}"))); + } + None + } else { + Some(read_manifest(&object_store, &latest_manifest_path).await?) + }; let mut peekable = batches.peekable(); let mut schema: Schema; @@ -152,8 +154,28 @@ impl Dataset { )); } - let mut fragment_id = 0; - let mut fragments: Vec = vec![]; + if matches!(params.mode, WriteMode::Append) { + if let Some(m) = latest_manifest.as_ref() { + if schema != m.schema { + return Err(Error::IO(format!( + "Append with different schema: original={} new={}", + m.schema, schema + ))); + } + } + } + + let mut fragment_id = latest_manifest.as_ref().map_or(0, |m| { + m.fragments + .iter() + .map(|f| f.id) + .max() + .map(|id| id + 1) + .unwrap_or(0) + }); + let mut fragments: Vec = latest_manifest + .as_ref() + .map_or(vec![], |m| m.fragments.as_ref().clone()); macro_rules! new_writer { () => {{ @@ -198,6 +220,7 @@ impl Dataset { }; let mut manifest = Manifest::new(&schema, Arc::new(fragments)); + manifest.version = latest_manifest.map_or(1, |m| m.version + 1); write_manifest_file(&object_store, &mut manifest, None).await?; let base = object_store.base_path().clone(); @@ -461,7 +484,7 @@ mod tests { use arrow_schema::{DataType, Field, Schema}; use arrow_select::take::take; use futures::stream::TryStreamExt; - + use prost_types::Struct; use tempfile::tempdir; #[tokio::test] @@ -544,6 +567,77 @@ mod tests { ) } + #[tokio::test] + async fn append_dataset() { + let test_dir = tempdir().unwrap(); + + let schema = Arc::new(Schema::new(vec![Field::new("i", DataType::Int32, false)])); + let batches = RecordBatchBuffer::new(vec![RecordBatch::try_new( + schema.clone(), + vec![Arc::new(Int32Array::from_iter_values(0..20))], + ) + .unwrap()]); + + let test_uri = test_dir.path().to_str().unwrap(); + let mut write_params = WriteParams::default(); + write_params.max_rows_per_file = 40; + write_params.max_rows_per_group = 10; + let mut batches: Box = Box::new(batches); + Dataset::write(&mut batches, test_uri, Some(write_params)) + .await + .unwrap(); + + let batches = RecordBatchBuffer::new(vec![RecordBatch::try_new( + schema.clone(), + vec![Arc::new(Int32Array::from_iter_values(20..40))], + ) + .unwrap()]); + write_params.mode = WriteMode::Append; + let mut batches: Box = Box::new(batches); + Dataset::write(&mut batches, test_uri, Some(write_params)) + .await + .unwrap(); + + let expected_batch = RecordBatch::try_new( + schema.clone(), + vec![Arc::new(Int32Array::from_iter_values(0..40))], + ) + .unwrap(); + + let actual_ds = Dataset::open(test_uri).await.unwrap(); + assert_eq!(actual_ds.version().version, 2); + let actual_schema = Schema::from(actual_ds.schema()); + assert_eq!(&actual_schema, schema.as_ref()); + + let actual_batches = actual_ds + .scan() + .try_into_stream() + .await + .unwrap() + .try_collect::>() + .await + .unwrap(); + // sort + let actual_batch = concat_batches(&schema, &actual_batches).unwrap(); + let idx_arr = actual_batch.column_by_name("i").unwrap(); + let sorted_indices = sort_to_indices(idx_arr, None, None).unwrap(); + let struct_arr: StructArray = actual_batch.into(); + let sorted_arr = take(&struct_arr, &sorted_indices, None).unwrap(); + + let expected_struct_arr: StructArray = expected_batch.into(); + assert_eq!(&expected_struct_arr, as_struct_array(sorted_arr.as_ref())); + + // Each fragments has different fragment ID + assert_eq!( + actual_ds + .fragments() + .iter() + .map(|f| f.id) + .collect::>(), + (0..10).collect::>() + ) + } + #[ignore] #[tokio::test] async fn test_create_index() { diff --git a/rust/src/dataset/write.rs b/rust/src/dataset/write.rs index 84ab7706e0..b83d481e10 100644 --- a/rust/src/dataset/write.rs +++ b/rust/src/dataset/write.rs @@ -16,6 +16,7 @@ // under the License. /// The mode to write dataset. +#[derive(Debug, Clone, Copy)] pub enum WriteMode { /// Create a new dataset. Expect the dataset does not exist. Create, @@ -26,6 +27,7 @@ pub enum WriteMode { } /// Dataset Write Parameters +#[derive(Debug, Clone, Copy)] pub struct WriteParams { /// Max number of records per file. pub max_rows_per_file: usize, diff --git a/rust/src/io/object_store.rs b/rust/src/io/object_store.rs index b1ff7d0735..4b6a7dc542 100644 --- a/rust/src/io/object_store.rs +++ b/rust/src/io/object_store.rs @@ -139,4 +139,13 @@ impl ObjectStore { pub async fn create(&self, path: &Path) -> Result { ObjectWriter::new(self, path).await } + + /// Check a file exists. + pub async fn exists(&self, path: &Path) -> Result { + match self.inner.head(path).await { + Ok(_) => Ok(true), + Err(object_store::Error::NotFound { path: _, source: _ }) => Ok(false), + Err(e) => Err(e.into()), + } + } } From 43e25e7a62dffe00b2f35ba2357cebf572b2364d Mon Sep 17 00:00:00 2001 From: Lei Xu Date: Sat, 28 Jan 2023 15:50:12 -0800 Subject: [PATCH 3/3] support append --- rust/src/dataset.rs | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/rust/src/dataset.rs b/rust/src/dataset.rs index ae4c1e731a..268e7d4d31 100644 --- a/rust/src/dataset.rs +++ b/rust/src/dataset.rs @@ -484,7 +484,6 @@ mod tests { use arrow_schema::{DataType, Field, Schema}; use arrow_select::take::take; use futures::stream::TryStreamExt; - use prost_types::Struct; use tempfile::tempdir; #[tokio::test] @@ -634,7 +633,7 @@ mod tests { .iter() .map(|f| f.id) .collect::>(), - (0..10).collect::>() + (0..2).collect::>() ) }