Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Rust] Minimal dataset append support #482

Merged
merged 3 commits into from
Jan 29, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion rust/pylance/src/dataset.rs
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ pub fn write_dataset(reader: &PyAny, uri: &str, options: &PyDict) -> PyResult<bo
Box::new(ArrowArrayStreamReader::from_pyarrow(reader)?)
};

LanceDataset::create(&mut batches, uri, params)
LanceDataset::write(&mut batches, uri, params)
.await
.map_err(|err| PyIOError::new_err(err.to_string()))?;
Ok(true)
Expand Down
121 changes: 107 additions & 14 deletions rust/src/dataset.rs
Original file line number Diff line number Diff line change
Expand Up @@ -118,24 +118,26 @@ impl Dataset {
})
}

/// Create a new dataset with a stream of [RecordBatch]s.
/// Write to or Create a [Dataset] with a stream of [RecordBatch]s.
///
/// Returns the newly created [`Dataset`]. Returns [Error] if the dataset already exists.
pub async fn create(
pub async fn write(
batches: &mut Box<dyn RecordBatchReader>,
uri: &str,
params: Option<WriteParams>,
) -> Result<Self> {
// 1. check the directory does not exist.
let object_store = Arc::new(ObjectStore::new(uri)?);
let params = params.unwrap_or_default();

let latest_manifest_path = latest_manifest_path(object_store.base_path());
match object_store.inner.head(&latest_manifest_path).await {
Ok(_) => return Err(Error::IO(format!("Dataset already exists: {uri}"))),
Err(object_store::Error::NotFound { path: _, source: _ }) => { /* we are good */ }
Err(e) => return Err(Error::from(e)),
}
let params = params.unwrap_or_default();
let latest_manifest = if matches!(params.mode, WriteMode::Create) {
if object_store.exists(&latest_manifest_path).await? {
return Err(Error::IO(format!("Dataset already exists: {uri}")));
}
None
} else {
Some(read_manifest(&object_store, &latest_manifest_path).await?)
};

let mut peekable = batches.peekable();
let mut schema: Schema;
Expand All @@ -152,8 +154,28 @@ impl Dataset {
));
}

let mut fragment_id = 0;
let mut fragments: Vec<Fragment> = vec![];
if matches!(params.mode, WriteMode::Append) {
if let Some(m) = latest_manifest.as_ref() {
if schema != m.schema {
return Err(Error::IO(format!(
"Append with different schema: original={} new={}",
m.schema, schema
)));
}
}
}

let mut fragment_id = latest_manifest.as_ref().map_or(0, |m| {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

would it make sense to put max fragment in manifest?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is interesting about how to enforce it. i.e., allow a background thread to start compact but allow writing, or just hard stop users to append new fragments.

This could be a question we need to answer when the scale is up.

m.fragments
.iter()
.map(|f| f.id)
.max()
.map(|id| id + 1)
.unwrap_or(0)
});
let mut fragments: Vec<Fragment> = latest_manifest
.as_ref()
.map_or(vec![], |m| m.fragments.as_ref().clone());

macro_rules! new_writer {
() => {{
Expand Down Expand Up @@ -198,6 +220,7 @@ impl Dataset {
};

let mut manifest = Manifest::new(&schema, Arc::new(fragments));
manifest.version = latest_manifest.map_or(1, |m| m.version + 1);
write_manifest_file(&object_store, &mut manifest, None).await?;

let base = object_store.base_path().clone();
Expand Down Expand Up @@ -461,7 +484,6 @@ mod tests {
use arrow_schema::{DataType, Field, Schema};
use arrow_select::take::take;
use futures::stream::TryStreamExt;

use tempfile::tempdir;

#[tokio::test]
Expand Down Expand Up @@ -505,7 +527,7 @@ mod tests {
write_params.max_rows_per_file = 40;
write_params.max_rows_per_group = 10;
let mut reader: Box<dyn RecordBatchReader> = Box::new(batches);
Dataset::create(&mut reader, test_uri, Some(write_params))
Dataset::write(&mut reader, test_uri, Some(write_params))
.await
.unwrap();

Expand Down Expand Up @@ -544,6 +566,77 @@ mod tests {
)
}

#[tokio::test]
async fn append_dataset() {
let test_dir = tempdir().unwrap();

let schema = Arc::new(Schema::new(vec![Field::new("i", DataType::Int32, false)]));
let batches = RecordBatchBuffer::new(vec![RecordBatch::try_new(
schema.clone(),
vec![Arc::new(Int32Array::from_iter_values(0..20))],
)
.unwrap()]);

let test_uri = test_dir.path().to_str().unwrap();
let mut write_params = WriteParams::default();
write_params.max_rows_per_file = 40;
write_params.max_rows_per_group = 10;
let mut batches: Box<dyn RecordBatchReader> = Box::new(batches);
Dataset::write(&mut batches, test_uri, Some(write_params))
.await
.unwrap();

let batches = RecordBatchBuffer::new(vec![RecordBatch::try_new(
schema.clone(),
vec![Arc::new(Int32Array::from_iter_values(20..40))],
)
.unwrap()]);
write_params.mode = WriteMode::Append;
let mut batches: Box<dyn RecordBatchReader> = Box::new(batches);
Dataset::write(&mut batches, test_uri, Some(write_params))
.await
.unwrap();

let expected_batch = RecordBatch::try_new(
schema.clone(),
vec![Arc::new(Int32Array::from_iter_values(0..40))],
)
.unwrap();

let actual_ds = Dataset::open(test_uri).await.unwrap();
assert_eq!(actual_ds.version().version, 2);
let actual_schema = Schema::from(actual_ds.schema());
assert_eq!(&actual_schema, schema.as_ref());

let actual_batches = actual_ds
.scan()
.try_into_stream()
.await
.unwrap()
.try_collect::<Vec<_>>()
.await
.unwrap();
// sort
let actual_batch = concat_batches(&schema, &actual_batches).unwrap();
let idx_arr = actual_batch.column_by_name("i").unwrap();
let sorted_indices = sort_to_indices(idx_arr, None, None).unwrap();
let struct_arr: StructArray = actual_batch.into();
let sorted_arr = take(&struct_arr, &sorted_indices, None).unwrap();

let expected_struct_arr: StructArray = expected_batch.into();
assert_eq!(&expected_struct_arr, as_struct_array(sorted_arr.as_ref()));

// Each fragments has different fragment ID
assert_eq!(
actual_ds
.fragments()
.iter()
.map(|f| f.id)
.collect::<Vec<_>>(),
(0..2).collect::<Vec<_>>()
)
}

#[ignore]
#[tokio::test]
async fn test_create_index() {
Expand All @@ -569,7 +662,7 @@ mod tests {
let test_uri = test_dir.path().to_str().unwrap();

let mut reader: Box<dyn RecordBatchReader> = Box::new(batches);
let dataset = Dataset::create(&mut reader, test_uri, None).await.unwrap();
let dataset = Dataset::write(&mut reader, test_uri, None).await.unwrap();

let params = VectorIndexParams::default();
dataset
Expand Down
2 changes: 2 additions & 0 deletions rust/src/dataset/write.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
// under the License.

/// The mode to write dataset.
#[derive(Debug, Clone, Copy)]
pub enum WriteMode {
/// Create a new dataset. Expect the dataset does not exist.
Create,
Expand All @@ -26,6 +27,7 @@ pub enum WriteMode {
}

/// Dataset Write Parameters
#[derive(Debug, Clone, Copy)]
pub struct WriteParams {
/// Max number of records per file.
pub max_rows_per_file: usize,
Expand Down
2 changes: 1 addition & 1 deletion rust/src/io/exec/knn.rs
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,7 @@ mod tests {
let q = as_fixed_size_list_array(&vector_arr).value(5);

let mut reader: Box<dyn RecordBatchReader> = Box::new(batches);
Dataset::create(&mut reader, test_uri, Some(write_params))
Dataset::write(&mut reader, test_uri, Some(write_params))
.await
.unwrap();

Expand Down
2 changes: 1 addition & 1 deletion rust/src/io/exec/limit.rs
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,7 @@ mod tests {
let mut params = WriteParams::default();
params.max_rows_per_group = 10;
let mut reader: Box<dyn RecordBatchReader> = Box::new(batches);
Dataset::create(&mut reader, path, Some(params))
Dataset::write(&mut reader, path, Some(params))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think pyo3 will break on the name change. Don't have GHA yet for this so it's annoying. I will try to add it tonight

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, that's nice. Thanks.

.await
.unwrap();
expected_batches
Expand Down
9 changes: 9 additions & 0 deletions rust/src/io/object_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -139,4 +139,13 @@ impl ObjectStore {
pub async fn create(&self, path: &Path) -> Result<ObjectWriter> {
ObjectWriter::new(self, path).await
}

/// Check a file exists.
pub async fn exists(&self, path: &Path) -> Result<bool> {
match self.inner.head(path).await {
Ok(_) => Ok(true),
Err(object_store::Error::NotFound { path: _, source: _ }) => Ok(false),
Err(e) => Err(e.into()),
}
}
}