From d174c7ee49dfd3444bc50bfd5669c0ca1006ff93 Mon Sep 17 00:00:00 2001 From: Will Jones Date: Sat, 9 Apr 2022 17:36:49 -0700 Subject: [PATCH 01/14] Sketch out write operations --- data-access/src/object_store/local.rs | 33 +++++++++++++++++++++++++-- data-access/src/object_store/mod.rs | 14 +++++++++++- 2 files changed, 44 insertions(+), 3 deletions(-) diff --git a/data-access/src/object_store/local.rs b/data-access/src/object_store/local.rs index f4872ae17420..ab5b2b35fc63 100644 --- a/data-access/src/object_store/local.rs +++ b/data-access/src/object_store/local.rs @@ -18,17 +18,19 @@ //! Object store that represents the Local File System. use std::fs::{self, File, Metadata}; -use std::io; +use std::io::{self, Write}; use std::io::{BufReader, Read, Seek, SeekFrom}; use std::sync::Arc; use async_trait::async_trait; use futures::{stream, AsyncRead, StreamExt}; +use tokio::io::AsyncWrite; +use tokio::fs::File as AsyncFile; use crate::{FileMeta, Result, SizedFile}; use super::{ - FileMetaStream, ListEntryStream, ObjectReader, ObjectReaderStream, ObjectStore, + FileMetaStream, ListEntryStream, ObjectReader, ObjectWriter, ObjectReaderStream, ObjectStore, }; pub static LOCAL_SCHEME: &str = "file"; @@ -59,6 +61,10 @@ impl ObjectStore for LocalFileSystem { fn file_reader(&self, file: SizedFile) -> Result> { Ok(Arc::new(LocalFileReader::new(file)?)) } + + fn file_writer(&self, file: SizedFile) -> Result> { + Ok(Arc::new(LocalFileWriter::new(file)?)) + } } struct LocalFileReader { @@ -189,6 +195,29 @@ pub fn local_unpartitioned_file(file: String) -> FileMeta { } } +struct LocalFileWriter { + file: SizedFile +} + +impl LocalFileWriter { + fn new(file: SizedFile) -> Result { + Ok(Self { file }) + } +} + +#[async_trait] +impl ObjectWriter for LocalFileWriter { + async fn writer(&self) -> Result> { + let file = AsyncFile::open(&self.file.path).await?; + Ok(Box::new(file)) + } + + fn sync_writer(&self) -> Result> { + let file = File::open(&self.file.path)?; + Ok(Box::new(file)) + } +} + #[cfg(test)] mod tests { use super::*; diff --git a/data-access/src/object_store/mod.rs b/data-access/src/object_store/mod.rs index 5d2f76e27931..6a84a19f878e 100644 --- a/data-access/src/object_store/mod.rs +++ b/data-access/src/object_store/mod.rs @@ -20,12 +20,13 @@ pub mod local; use std::fmt::Debug; -use std::io::Read; +use std::io::{Read, Write}; use std::pin::Pin; use std::sync::Arc; use async_trait::async_trait; use futures::{AsyncRead, Stream, StreamExt}; +use tokio::io::AsyncWrite; use crate::{FileMeta, ListEntry, Result, SizedFile}; @@ -67,6 +68,14 @@ pub trait ObjectReader: Send + Sync { fn length(&self) -> u64; } +/// Object Writer for one file in an object store. +#[async_trait] +pub trait ObjectWriter: Send + Sync { + async fn writer(&self) -> Result>; + + fn sync_writer(&self) -> Result>; +} + /// A ObjectStore abstracts access to an underlying file/object storage. /// It maps strings (e.g. URLs, filesystem paths, etc) to sources of bytes #[async_trait] @@ -101,4 +110,7 @@ pub trait ObjectStore: Sync + Send + Debug { /// Get object reader for one file fn file_reader(&self, file: SizedFile) -> Result>; + + /// Get object writer for one file + fn file_writer(&self, file: SizedFile) -> Result>; } From b501ee55dfe1ebebd42dbf8a2cf70127edb3e43d Mon Sep 17 00:00:00 2001 From: Will Jones Date: Fri, 15 Apr 2022 19:25:32 -0700 Subject: [PATCH 02/14] It doesn't makes sense for writer to take SizedFile --- data-access/src/object_store/local.rs | 29 ++++++++++++++++----------- data-access/src/object_store/mod.rs | 2 +- 2 files changed, 18 insertions(+), 13 deletions(-) diff --git a/data-access/src/object_store/local.rs b/data-access/src/object_store/local.rs index ab5b2b35fc63..7a92893210b5 100644 --- a/data-access/src/object_store/local.rs +++ b/data-access/src/object_store/local.rs @@ -35,6 +35,14 @@ use super::{ pub static LOCAL_SCHEME: &str = "file"; +fn path_without_scheme(full_path: &str) -> &str { + if let Some((_scheme, path)) = full_path.split_once("://") { + path + } else { + full_path + } +} + #[derive(Debug)] /// Local File System as Object Store. pub struct LocalFileSystem; @@ -42,11 +50,7 @@ pub struct LocalFileSystem; #[async_trait] impl ObjectStore for LocalFileSystem { async fn list_file(&self, prefix: &str) -> Result { - let prefix = if let Some((_scheme, path)) = prefix.split_once("://") { - path - } else { - prefix - }; + let prefix = path_without_scheme(prefix); list_all(prefix.to_owned()).await } @@ -62,8 +66,9 @@ impl ObjectStore for LocalFileSystem { Ok(Arc::new(LocalFileReader::new(file)?)) } - fn file_writer(&self, file: SizedFile) -> Result> { - Ok(Arc::new(LocalFileWriter::new(file)?)) + fn file_writer(&self, path: String) -> Result> { + let path = path_without_scheme(&path).to_string(); + Ok(Arc::new(LocalFileWriter::new(path)?)) } } @@ -196,24 +201,24 @@ pub fn local_unpartitioned_file(file: String) -> FileMeta { } struct LocalFileWriter { - file: SizedFile + path: String } impl LocalFileWriter { - fn new(file: SizedFile) -> Result { - Ok(Self { file }) + fn new(path: String) -> Result { + Ok(Self { path }) } } #[async_trait] impl ObjectWriter for LocalFileWriter { async fn writer(&self) -> Result> { - let file = AsyncFile::open(&self.file.path).await?; + let file = AsyncFile::open(&self.path).await?; Ok(Box::new(file)) } fn sync_writer(&self) -> Result> { - let file = File::open(&self.file.path)?; + let file = File::open(&self.path)?; Ok(Box::new(file)) } } diff --git a/data-access/src/object_store/mod.rs b/data-access/src/object_store/mod.rs index 6a84a19f878e..613989ed76ff 100644 --- a/data-access/src/object_store/mod.rs +++ b/data-access/src/object_store/mod.rs @@ -112,5 +112,5 @@ pub trait ObjectStore: Sync + Send + Debug { fn file_reader(&self, file: SizedFile) -> Result>; /// Get object writer for one file - fn file_writer(&self, file: SizedFile) -> Result>; + fn file_writer(&self, path: String) -> Result>; } From c8f28aa2d8fcdf23b1fc0730c8fe4d2182cf3104 Mon Sep 17 00:00:00 2001 From: Will Jones Date: Fri, 15 Apr 2022 19:57:24 -0700 Subject: [PATCH 03/14] It needs to be pin --- data-access/src/object_store/local.rs | 5 +++-- data-access/src/object_store/mod.rs | 2 +- 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/data-access/src/object_store/local.rs b/data-access/src/object_store/local.rs index 7a92893210b5..7607794d365d 100644 --- a/data-access/src/object_store/local.rs +++ b/data-access/src/object_store/local.rs @@ -20,6 +20,7 @@ use std::fs::{self, File, Metadata}; use std::io::{self, Write}; use std::io::{BufReader, Read, Seek, SeekFrom}; +use std::pin::Pin; use std::sync::Arc; use async_trait::async_trait; @@ -212,9 +213,9 @@ impl LocalFileWriter { #[async_trait] impl ObjectWriter for LocalFileWriter { - async fn writer(&self) -> Result> { + async fn writer(&self) -> Result>> { let file = AsyncFile::open(&self.path).await?; - Ok(Box::new(file)) + Ok(Box::pin(file)) } fn sync_writer(&self) -> Result> { diff --git a/data-access/src/object_store/mod.rs b/data-access/src/object_store/mod.rs index 613989ed76ff..7f1c0949e55f 100644 --- a/data-access/src/object_store/mod.rs +++ b/data-access/src/object_store/mod.rs @@ -71,7 +71,7 @@ pub trait ObjectReader: Send + Sync { /// Object Writer for one file in an object store. #[async_trait] pub trait ObjectWriter: Send + Sync { - async fn writer(&self) -> Result>; + async fn writer(&self) -> Result>>; fn sync_writer(&self) -> Result>; } From e9c52c969a767afc139c72a2c97669780468eade Mon Sep 17 00:00:00 2001 From: Will Jones Date: Sat, 16 Apr 2022 08:40:22 -0700 Subject: [PATCH 04/14] Add path_without_scheme to public API --- data-access/src/object_store/local.rs | 9 +-------- data-access/src/object_store/mod.rs | 17 +++++++++++++++++ 2 files changed, 18 insertions(+), 8 deletions(-) diff --git a/data-access/src/object_store/local.rs b/data-access/src/object_store/local.rs index 7607794d365d..66a5621377db 100644 --- a/data-access/src/object_store/local.rs +++ b/data-access/src/object_store/local.rs @@ -32,18 +32,11 @@ use crate::{FileMeta, Result, SizedFile}; use super::{ FileMetaStream, ListEntryStream, ObjectReader, ObjectWriter, ObjectReaderStream, ObjectStore, + path_without_scheme }; pub static LOCAL_SCHEME: &str = "file"; -fn path_without_scheme(full_path: &str) -> &str { - if let Some((_scheme, path)) = full_path.split_once("://") { - path - } else { - full_path - } -} - #[derive(Debug)] /// Local File System as Object Store. pub struct LocalFileSystem; diff --git a/data-access/src/object_store/mod.rs b/data-access/src/object_store/mod.rs index 7f1c0949e55f..27413e717ca4 100644 --- a/data-access/src/object_store/mod.rs +++ b/data-access/src/object_store/mod.rs @@ -114,3 +114,20 @@ pub trait ObjectStore: Sync + Send + Debug { /// Get object writer for one file fn file_writer(&self, path: String) -> Result>; } + +// TODO: Document below when we do and do not expect a scheme +/// Return path without scheme +/// +/// # Examples +/// +/// ``` +/// let path = "file://path/to/object"; +/// assert_eq(path_without_scheme(path), "path/to/object"); +/// ``` +pub fn path_without_scheme(full_path: &str) -> &str { + if let Some((_scheme, path)) = full_path.split_once("://") { + path + } else { + full_path + } +} \ No newline at end of file From 7c193853d2885767526ad71ae5fd1bbde4617c26 Mon Sep 17 00:00:00 2001 From: Will Jones Date: Sun, 17 Apr 2022 13:57:08 -0700 Subject: [PATCH 05/14] Get all but copy working --- data-access/Cargo.toml | 2 +- data-access/src/object_store/local.rs | 323 ++++++++++++++++++++++++-- data-access/src/object_store/mod.rs | 40 +++- 3 files changed, 344 insertions(+), 21 deletions(-) diff --git a/data-access/Cargo.toml b/data-access/Cargo.toml index aaa869f0ab86..f94c08c3b4c8 100644 --- a/data-access/Cargo.toml +++ b/data-access/Cargo.toml @@ -34,7 +34,7 @@ path = "src/lib.rs" [dependencies] async-trait = "0.1.41" -chrono = { version = "0.4", default-features = false } +chrono = { version = "0.4"} futures = "0.3" parking_lot = "0.12" tempfile = "3" diff --git a/data-access/src/object_store/local.rs b/data-access/src/object_store/local.rs index 66a5621377db..d2cf93d9d151 100644 --- a/data-access/src/object_store/local.rs +++ b/data-access/src/object_store/local.rs @@ -18,21 +18,20 @@ //! Object store that represents the Local File System. use std::fs::{self, File, Metadata}; -use std::io::{self, Write}; -use std::io::{BufReader, Read, Seek, SeekFrom}; +use std::io::{self, BufReader, Read, Seek, SeekFrom, Write}; use std::pin::Pin; use std::sync::Arc; use async_trait::async_trait; +use chrono::{DateTime, Utc}; use futures::{stream, AsyncRead, StreamExt}; -use tokio::io::AsyncWrite; -use tokio::fs::File as AsyncFile; +use tokio::{fs::File as AsyncFile, io::AsyncWrite}; use crate::{FileMeta, Result, SizedFile}; use super::{ - FileMetaStream, ListEntryStream, ObjectReader, ObjectWriter, ObjectReaderStream, ObjectStore, - path_without_scheme + path_without_scheme, FileMetaStream, ListEntryStream, ObjectReader, + ObjectReaderStream, ObjectStore, ObjectWriter, }; pub static LOCAL_SCHEME: &str = "file"; @@ -60,10 +59,66 @@ impl ObjectStore for LocalFileSystem { Ok(Arc::new(LocalFileReader::new(file)?)) } - fn file_writer(&self, path: String) -> Result> { - let path = path_without_scheme(&path).to_string(); + fn file_writer(&self, path: &str) -> Result> { + let path = path_without_scheme(path).to_string(); Ok(Arc::new(LocalFileWriter::new(path)?)) } + + async fn create_dir(&self, path: &str, recursive: bool) -> Result<()> { + let res = match recursive { + false => tokio::fs::create_dir(path).await, + true => tokio::fs::create_dir_all(path).await, + }; + match res { + Ok(()) => Ok(()), + Err(ref e) if e.kind() == io::ErrorKind::AlreadyExists => Ok(()), + Err(err) => Err(err), + } + } + + async fn remove_dir_all(&self, path: &str) -> Result<()> { + tokio::fs::remove_dir_all(path).await + } + + async fn remove_dir_contents(&self, path: &str) -> Result<()> { + let mut entries = tokio::fs::read_dir(path).await?; + while let Some(entry) = entries.next_entry().await? { + if entry.file_type().await?.is_dir() { + self.remove_dir_all(entry.path().to_str().unwrap()).await?; + } else { + self.remove_file(entry.path().to_str().unwrap()).await?; + } + } + Ok(()) + } + + async fn remove_file(&self, path: &str) -> Result<()> { + let res = tokio::fs::remove_file(path).await; + match res { + Ok(()) => Ok(()), + Err(e) if e.kind() == io::ErrorKind::PermissionDenied => { + // If path is a directory, we should return InvalidInput instead + if tokio::fs::metadata(path).await?.is_dir() { + Err(io::Error::new( + io::ErrorKind::InvalidInput, + "was a directory", + )) + } else { + Err(e) + } + } + Err(err) => Err(err), + } + } + + async fn rename(&self, source: &str, dest: &str) -> Result<()> { + tokio::fs::rename(source, dest).await + } + + async fn copy(&self, source: &str, dest: &str) -> Result<()> { + tokio::fs::copy(source, dest).await?; + Ok(()) + } } struct LocalFileReader { @@ -190,12 +245,12 @@ pub fn local_unpartitioned_file(file: String) -> FileMeta { size: metadata.len(), path: file, }, - last_modified: metadata.modified().map(chrono::DateTime::from).ok(), + last_modified: metadata.modified().map(DateTime::::from).ok(), } } struct LocalFileWriter { - path: String + path: String, } impl LocalFileWriter { @@ -222,8 +277,8 @@ mod tests { use super::*; use futures::StreamExt; use std::collections::HashSet; - use std::fs::create_dir; use std::fs::File; + use std::fs::{create_dir, read_dir}; use tempfile::tempdir; #[tokio::test] @@ -231,10 +286,11 @@ mod tests { // tmp/a.txt // tmp/x/b.txt // tmp/y/c.txt - let tmp = tempdir()?; - let x_path = tmp.path().join("x"); - let y_path = tmp.path().join("y"); - let a_path = tmp.path().join("a.txt"); + let tmp_dir = tempdir()?; + let tmp = tmp_dir.path(); + let x_path = tmp.join("x"); + let y_path = tmp.join("y"); + let a_path = tmp.join("a.txt"); let b_path = x_path.join("b.txt"); let c_path = y_path.join("c.txt"); create_dir(&x_path)?; @@ -244,7 +300,7 @@ mod tests { File::create(&c_path)?; let mut all_files = HashSet::new(); - let mut files = list_all(tmp.path().to_str().unwrap().to_string()).await?; + let mut files = list_all(tmp.to_str().unwrap().to_string()).await?; while let Some(file) = files.next().await { let file = file?; assert_eq!(file.size(), 0); @@ -258,4 +314,239 @@ mod tests { Ok(()) } + + #[tokio::test] + async fn test_create_dir() -> Result<()> { + let tmp_dir = tempdir()?; + let tmp = tmp_dir.path(); + + let fs = LocalFileSystem; + + // Create directory succeeds + let z_path = tmp.join("z"); + fs.create_dir(z_path.to_str().unwrap(), false).await?; + assert!(z_path.exists()); + + // Create recursive directory succeeds + let rec_path = tmp.join("w").join("a"); + fs.create_dir(rec_path.to_str().unwrap(), true).await?; + assert!(rec_path.exists()); + + // Returns Ok if already exists + fs.create_dir(tmp.to_str().unwrap(), false).await?; + + Ok(()) + } + + #[tokio::test] + async fn test_remove_dir() -> Result<()> { + // tmp/a.txt + // tmp/x/b.txt + let tmp_dir = tempdir()?; + let tmp = tmp_dir.path(); + let x_path = tmp.join("x"); + let a_path = tmp.join("a.txt"); + let b_path = x_path.join("b.txt"); + create_dir(&x_path)?; + File::create(&a_path)?; + File::create(&b_path)?; + + let fs = LocalFileSystem; + + // Delete contents tmp means tmp is empty + fs.remove_dir_contents(tmp.to_str().unwrap()).await?; + assert!(tmp.exists()); + assert_eq!(read_dir(tmp)?.count(), 0); + + // Delete tmp means no tmp + fs.remove_dir_all(tmp.to_str().unwrap()).await?; + assert!(!tmp.exists()); + + Ok(()) + } + + #[tokio::test] + async fn test_remove_file() -> Result<()> { + // tmp/a.txt + // tmp/x + let tmp_dir = tempdir()?; + let tmp = tmp_dir.path(); + let x_path = tmp.join("x"); + let a_path = tmp.join("a.txt"); + create_dir(&x_path)?; + File::create(&a_path)?; + + let fs = LocalFileSystem; + + // Delete existing file works + fs.remove_file(a_path.to_str().unwrap()).await?; + assert!(!a_path.exists()); + + // Delete non-existent file errors + let res = fs + .remove_file(tmp.join("missing.txt").to_str().unwrap()) + .await; + assert!(res.is_err()); + assert_eq!(res.unwrap_err().kind(), std::io::ErrorKind::NotFound); + + // Delete file on directory errors + let res = fs.remove_file(x_path.to_str().unwrap()).await; + assert!(res.is_err()); + assert_eq!(res.unwrap_err().kind(), std::io::ErrorKind::InvalidInput); + + Ok(()) + } + + #[tokio::test] + async fn test_rename() -> Result<()> { + // tmp/a.txt + // tmp/b.txt + // tmp/x/b.txt + // tmp/empty/ + // tmp/y/c.txt + // tmp/y/z/d.txt + let tmp_dir = tempdir()?; + let tmp = tmp_dir.path(); + let x_path = tmp.join("x"); + let empty_path = tmp.join("empty"); + let y_path = tmp.join("y"); + let z_path = y_path.join("z"); + let a_path = tmp.join("a.txt"); + let b_path = tmp.join("b.txt"); + let x_b_path = x_path.join("b.txt"); + let c_path = y_path.join("c.txt"); + let d_path = z_path.join("d.txt"); + create_dir(&x_path)?; + create_dir(&empty_path)?; + create_dir(&y_path)?; + create_dir(&z_path)?; + File::create(&a_path)?; + File::create(&b_path)?; + File::create(&x_b_path)?; + File::create(&c_path)?; + File::create(&d_path)?; + + let fs = LocalFileSystem; + + // Can rename a file, and it will exist at dest and not at source + let a2_path = tmp.join("a2.txt"); + fs.rename(a_path.to_str().unwrap(), a2_path.to_str().unwrap()) + .await?; + assert!(!a_path.exists()); + assert!(a2_path.exists()); + + // rename replaces files + let test_content = b"test"; + let mut f = File::create(&a_path)?; + f.write(test_content)?; + f.flush()?; + fs.rename(a_path.to_str().unwrap(), b_path.to_str().unwrap()) + .await?; + assert!(!a_path.exists()); + assert!(b_path.exists()); + let mut f = File::open(&b_path)?; + let mut actual_content = Vec::new(); + f.read_to_end(&mut actual_content)?; + assert_eq!(actual_content, test_content); + + // Can rename a directory, and it will recursively copy contents + let dest_path = tmp.join("v"); + fs.rename(y_path.to_str().unwrap(), dest_path.to_str().unwrap()) + .await?; + assert!(!y_path.exists()); + assert!(dest_path.exists()); + assert!(dest_path.join("c.txt").exists()); + assert!(dest_path.join("z").join("d.txt").exists()); + + // rename errors if it would overwrite non-empty directory + let res = fs + .rename(dest_path.to_str().unwrap(), x_path.to_str().unwrap()) + .await; + assert!(res.is_err()); + // We cannot test for specific error. See: https://diziet.dreamwidth.org/9894.html + + // rename succeeds if it would overwrite an empty directory + fs.rename(dest_path.to_str().unwrap(), empty_path.to_str().unwrap()) + .await?; + + Ok(()) + } + + #[tokio::test] + async fn test_copy() -> Result<()> { + // tmp/a.txt + // tmp/b.txt + // tmp/x/b.txt + // tmp/empty/ + // tmp/y/c.txt + // tmp/y/z/d.txt + let tmp_dir = tempdir()?; + let tmp = tmp_dir.path(); + let x_path = tmp.join("x"); + let empty_path = tmp.join("empty"); + let y_path = tmp.join("y"); + let z_path = y_path.join("z"); + let a_path = tmp.join("a.txt"); + let b_path = tmp.join("b.txt"); + let x_b_path = x_path.join("b.txt"); + let c_path = y_path.join("c.txt"); + let d_path = z_path.join("d.txt"); + create_dir(&x_path)?; + create_dir(&empty_path)?; + create_dir(&y_path)?; + create_dir(&z_path)?; + File::create(&a_path)?; + File::create(&b_path)?; + File::create(&x_b_path)?; + File::create(&c_path)?; + File::create(&d_path)?; + + let fs = LocalFileSystem; + + // Can copy a file, and it will exist at dest and source + let a2_path = tmp.join("a2.txt"); + fs.copy(a_path.to_str().unwrap(), a2_path.to_str().unwrap()) + .await?; + assert!(a_path.exists()); + assert!(a2_path.exists()); + + // Copy replaces files + let test_content = b"test"; + let mut f = File::create(&a_path)?; + f.write(test_content)?; + f.flush()?; + fs.copy(a_path.to_str().unwrap(), b_path.to_str().unwrap()) + .await?; + assert!(a_path.exists()); + assert!(b_path.exists()); + let mut f = File::open(&b_path)?; + let mut actual_content = Vec::new(); + f.read_to_end(&mut actual_content)?; + assert_eq!(actual_content, test_content); + + // Can copy a directory, and it will recursively copy contents + let dest_path = tmp.join("v"); + fs.copy(y_path.to_str().unwrap(), dest_path.to_str().unwrap()) + .await?; + assert!(y_path.exists()); + assert!(dest_path.exists()); + assert!(dest_path.join("c.txt").exists()); + assert!(dest_path.join("z").join("d.txt").exists()); + + // Copy errors if it would overwrite a non-empty directory + let res = fs + .copy(dest_path.to_str().unwrap(), x_path.to_str().unwrap()) + .await; + assert!(res.is_err()); + assert_eq!(res.unwrap_err().kind(), std::io::ErrorKind::AlreadyExists); + + // Copy errors if it would overwrite a non-empty directory + let res = fs + .copy(dest_path.to_str().unwrap(), empty_path.to_str().unwrap()) + .await; + assert!(res.is_err()); + assert_eq!(res.unwrap_err().kind(), std::io::ErrorKind::AlreadyExists); + + Ok(()) + } } diff --git a/data-access/src/object_store/mod.rs b/data-access/src/object_store/mod.rs index 27413e717ca4..f35028425395 100644 --- a/data-access/src/object_store/mod.rs +++ b/data-access/src/object_store/mod.rs @@ -112,14 +112,46 @@ pub trait ObjectStore: Sync + Send + Debug { fn file_reader(&self, file: SizedFile) -> Result>; /// Get object writer for one file - fn file_writer(&self, path: String) -> Result>; + fn file_writer(&self, path: &str) -> Result>; + + /// Create directory, recursively if requested + /// + /// If directory already exists, will return Ok + async fn create_dir(&self, path: &str, recursive: bool) -> Result<()>; + + /// Delete directory and its contents, recursively + async fn remove_dir_all(&self, path: &str) -> Result<()>; + + /// Delete directory contents recursively + /// + /// Unlike [delete_dir], will not delete directory itself + async fn remove_dir_contents(&self, path: &str) -> Result<()>; + + /// Delete a file + /// + /// If file does not exist, will return error kind [std::io::ErrorKind::NotFound] + /// If attempted on a directory, will return error kind [std::io::ErrorKind::InvalidInput] + async fn remove_file(&self, path: &str) -> Result<()>; + + /// Rename a file or directory + /// + /// If dest exists, source will replace it unless dest is a non-empty directory, + /// in which case an [std::io::ErrorKind::AlreadyExists] or + /// [std::io::ErrorKind::DirectoryNotEmpty] error will be returned + async fn rename(&self, source: &str, dest: &str) -> Result<()>; + + /// Copy a file or directory + /// + /// If the destination exists and is a directory, an error is returned. + /// Otherwise, it is replaced. + async fn copy(&self, source: &str, dest: &str) -> Result<()>; } // TODO: Document below when we do and do not expect a scheme /// Return path without scheme -/// +/// /// # Examples -/// +/// /// ``` /// let path = "file://path/to/object"; /// assert_eq(path_without_scheme(path), "path/to/object"); @@ -130,4 +162,4 @@ pub fn path_without_scheme(full_path: &str) -> &str { } else { full_path } -} \ No newline at end of file +} From 92075698410b61102d254cdfcbe10e7e6daa5eb4 Mon Sep 17 00:00:00 2001 From: Will Jones Date: Sun, 17 Apr 2022 15:35:18 -0700 Subject: [PATCH 06/14] Get copy working --- data-access/src/object_store/local.rs | 52 ++++++++++++++++++++++++++- data-access/src/object_store/mod.rs | 3 +- 2 files changed, 53 insertions(+), 2 deletions(-) diff --git a/data-access/src/object_store/local.rs b/data-access/src/object_store/local.rs index d2cf93d9d151..3c9b06d7143a 100644 --- a/data-access/src/object_store/local.rs +++ b/data-access/src/object_store/local.rs @@ -19,6 +19,7 @@ use std::fs::{self, File, Metadata}; use std::io::{self, BufReader, Read, Seek, SeekFrom, Write}; +use std::path::PathBuf; use std::pin::Pin; use std::sync::Arc; @@ -116,7 +117,56 @@ impl ObjectStore for LocalFileSystem { } async fn copy(&self, source: &str, dest: &str) -> Result<()> { - tokio::fs::copy(source, dest).await?; + let source_path = PathBuf::from(source); + let dest_path = PathBuf::from(dest); + + if !source_path.exists() { + return Err(io::Error::new( + io::ErrorKind::NotFound, + "Source path not found", + )); + } + + if dest_path.exists() && dest_path.is_dir() { + return Err(io::Error::new( + io::ErrorKind::AlreadyExists, + "Cannot overwrite an existing directory.", + )); + } + + if source_path.is_file() { + tokio::fs::copy(source, dest).await?; + return Ok(()); + } + + self.create_dir(dest_path.clone().to_str().unwrap(), true) + .await?; + + let mut stack = Vec::new(); + stack.push(source_path.clone()); + + let source_root = PathBuf::from(source_path); + let dest_root = PathBuf::from(dest_path); + + while let Some(working_path) = stack.pop() { + let mut entries = tokio::fs::read_dir(working_path.clone()).await?; + + let working_dest = + dest_root.join(working_path.strip_prefix(&source_root).unwrap()); + self.create_dir(working_dest.to_str().unwrap(), true) + .await?; + + while let Some(entry) = entries.next_entry().await? { + if entry.path().is_file() { + let entry_dest = + dest_root.join(entry.path().strip_prefix(&source_root).unwrap()); + tokio::fs::copy(entry.path(), entry_dest.clone()).await?; + } else { + stack.push(entry.path()); + } + } + } + Ok(()) } } diff --git a/data-access/src/object_store/mod.rs b/data-access/src/object_store/mod.rs index f35028425395..0db3bca88272 100644 --- a/data-access/src/object_store/mod.rs +++ b/data-access/src/object_store/mod.rs @@ -153,8 +153,9 @@ pub trait ObjectStore: Sync + Send + Debug { /// # Examples /// /// ``` +/// use datafusion_data_access::object_store::path_without_scheme; /// let path = "file://path/to/object"; -/// assert_eq(path_without_scheme(path), "path/to/object"); +/// assert_eq!(path_without_scheme(path), "path/to/object"); /// ``` pub fn path_without_scheme(full_path: &str) -> &str { if let Some((_scheme, path)) = full_path.split_once("://") { From 2fa7666ce8cfbe68a456ae12087f4478586663ee Mon Sep 17 00:00:00 2001 From: Will Jones Date: Mon, 18 Apr 2022 20:50:23 -0700 Subject: [PATCH 07/14] Update lint issues and tests --- .../rust/core/src/serde/logical_plan/mod.rs | 51 ++++++++++++++++++- data-access/src/object_store/local.rs | 18 +++---- datafusion/core/src/test/object_store.rs | 32 +++++++++++- datafusion/core/tests/path_partition.rs | 30 ++++++++++- 4 files changed, 117 insertions(+), 14 deletions(-) diff --git a/ballista/rust/core/src/serde/logical_plan/mod.rs b/ballista/rust/core/src/serde/logical_plan/mod.rs index 57cb35decede..9b39c38e9f51 100644 --- a/ballista/rust/core/src/serde/logical_plan/mod.rs +++ b/ballista/rust/core/src/serde/logical_plan/mod.rs @@ -966,7 +966,7 @@ mod roundtrip_tests { self, object_store::{ local::LocalFileSystem, FileMetaStream, ListEntryStream, ObjectReader, - ObjectStore, + ObjectStore, ObjectWriter, }, SizedFile, }, @@ -1016,6 +1016,55 @@ mod roundtrip_tests { "this is only a test object store".to_string(), )) } + + fn file_writer( + &self, + _path: &str, + ) -> datafusion_data_access::Result> { + unimplemented!(); + } + + async fn create_dir( + &self, + _path: &str, + _recursive: bool, + ) -> datafusion_data_access::Result<()> { + unimplemented!(); + } + + async fn remove_dir_all( + &self, + _path: &str, + ) -> datafusion_data_access::Result<()> { + unimplemented!(); + } + + async fn remove_dir_contents( + &self, + _path: &str, + ) -> datafusion_data_access::Result<()> { + unimplemented!(); + } + + async fn remove_file(&self, _path: &str) -> datafusion_data_access::Result<()> { + unimplemented!(); + } + + async fn rename( + &self, + _source: &str, + _dest: &str, + ) -> datafusion_data_access::Result<()> { + unimplemented!(); + } + + async fn copy( + &self, + _source: &str, + _dest: &str, + ) -> datafusion_data_access::Result<()> { + unimplemented!(); + } } // Given a identity of a LogicalPlan converts it to protobuf and back, using debug formatting to test equality. diff --git a/data-access/src/object_store/local.rs b/data-access/src/object_store/local.rs index 3c9b06d7143a..ec931dd396a0 100644 --- a/data-access/src/object_store/local.rs +++ b/data-access/src/object_store/local.rs @@ -117,36 +117,32 @@ impl ObjectStore for LocalFileSystem { } async fn copy(&self, source: &str, dest: &str) -> Result<()> { - let source_path = PathBuf::from(source); - let dest_path = PathBuf::from(dest); + let source_root = PathBuf::from(source); + let dest_root = PathBuf::from(dest); - if !source_path.exists() { + if !source_root.exists() { return Err(io::Error::new( io::ErrorKind::NotFound, "Source path not found", )); } - if dest_path.exists() && dest_path.is_dir() { + if dest_root.exists() && dest_root.is_dir() { return Err(io::Error::new( io::ErrorKind::AlreadyExists, "Cannot overwrite an existing directory.", )); } - if source_path.is_file() { + if source_root.is_file() { tokio::fs::copy(source, dest).await?; return Ok(()); } - self.create_dir(dest_path.clone().to_str().unwrap(), true) + self.create_dir(dest_root.clone().to_str().unwrap(), true) .await?; - let mut stack = Vec::new(); - stack.push(source_path.clone()); - - let source_root = PathBuf::from(source_path); - let dest_root = PathBuf::from(dest_path); + let mut stack = vec![source_root.clone()]; while let Some(working_path) = stack.pop() { let mut entries = tokio::fs::read_dir(working_path.clone()).await?; diff --git a/datafusion/core/src/test/object_store.rs b/datafusion/core/src/test/object_store.rs index cc9c02305a76..8997748c66be 100644 --- a/datafusion/core/src/test/object_store.rs +++ b/datafusion/core/src/test/object_store.rs @@ -23,7 +23,9 @@ use std::{ }; use crate::datafusion_data_access::{ - object_store::{FileMetaStream, ListEntryStream, ObjectReader, ObjectStore}, + object_store::{ + FileMetaStream, ListEntryStream, ObjectReader, ObjectStore, ObjectWriter, + }, FileMeta, Result, SizedFile, }; use async_trait::async_trait; @@ -91,6 +93,34 @@ impl ObjectStore for TestObjectStore { )), } } + + fn file_writer(&self, _path: &str) -> Result> { + unimplemented!(); + } + + async fn create_dir(&self, _path: &str, _recursive: bool) -> Result<()> { + unimplemented!(); + } + + async fn remove_dir_all(&self, _path: &str) -> Result<()> { + unimplemented!(); + } + + async fn remove_dir_contents(&self, _path: &str) -> Result<()> { + unimplemented!(); + } + + async fn remove_file(&self, _path: &str) -> Result<()> { + unimplemented!(); + } + + async fn rename(&self, _source: &str, _dest: &str) -> Result<()> { + unimplemented!(); + } + + async fn copy(&self, _source: &str, _dest: &str) -> Result<()> { + unimplemented!(); + } } struct EmptyObjectReader(u64); diff --git a/datafusion/core/tests/path_partition.rs b/datafusion/core/tests/path_partition.rs index 2e4000552857..9147b222a650 100644 --- a/datafusion/core/tests/path_partition.rs +++ b/datafusion/core/tests/path_partition.rs @@ -25,7 +25,7 @@ use datafusion::{ datafusion_data_access::{ object_store::{ local::LocalFileSystem, FileMetaStream, ListEntryStream, ObjectReader, - ObjectStore, + ObjectStore, ObjectWriter, }, FileMeta, SizedFile, }, @@ -532,4 +532,32 @@ impl ObjectStore for MirroringObjectStore { )), } } + + fn file_writer(&self, _path: &str) -> datafusion_data_access::Result> { + unimplemented!(); + } + + async fn create_dir(&self, _path: &str, _recursive: bool) -> datafusion_data_access::Result<()> { + unimplemented!(); + } + + async fn remove_dir_all(&self, _path: &str) -> datafusion_data_access::Result<()> { + unimplemented!(); + } + + async fn remove_dir_contents(&self, _path: &str) -> datafusion_data_access::Result<()> { + unimplemented!(); + } + + async fn remove_file(&self, _path: &str) -> datafusion_data_access::Result<()> { + unimplemented!(); + } + + async fn rename(&self, _source: &str, _dest: &str) -> datafusion_data_access::Result<()> { + unimplemented!(); + } + + async fn copy(&self, _source: &str, _dest: &str) -> datafusion_data_access::Result<()> { + unimplemented!(); + } } From 6d06270273dd67c968c3f107b3a9e780d8f1a22e Mon Sep 17 00:00:00 2001 From: Will Jones Date: Mon, 18 Apr 2022 20:58:47 -0700 Subject: [PATCH 08/14] Fix cargo.toml --- data-access/Cargo.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/data-access/Cargo.toml b/data-access/Cargo.toml index f94c08c3b4c8..bfac84917f4a 100644 --- a/data-access/Cargo.toml +++ b/data-access/Cargo.toml @@ -34,7 +34,7 @@ path = "src/lib.rs" [dependencies] async-trait = "0.1.41" -chrono = { version = "0.4"} +chrono = { version = "0.4", features = ["std"] } futures = "0.3" parking_lot = "0.12" tempfile = "3" From ef0d987f61edfe19b6ff17a73f1f9860b731d139 Mon Sep 17 00:00:00 2001 From: Will Jones Date: Mon, 18 Apr 2022 21:10:21 -0700 Subject: [PATCH 09/14] Add path_without_scheme --- data-access/src/object_store/local.rs | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/data-access/src/object_store/local.rs b/data-access/src/object_store/local.rs index ec931dd396a0..d990eab5efc7 100644 --- a/data-access/src/object_store/local.rs +++ b/data-access/src/object_store/local.rs @@ -66,6 +66,7 @@ impl ObjectStore for LocalFileSystem { } async fn create_dir(&self, path: &str, recursive: bool) -> Result<()> { + let path = path_without_scheme(path); let res = match recursive { false => tokio::fs::create_dir(path).await, true => tokio::fs::create_dir_all(path).await, @@ -78,10 +79,12 @@ impl ObjectStore for LocalFileSystem { } async fn remove_dir_all(&self, path: &str) -> Result<()> { + let path = path_without_scheme(path); tokio::fs::remove_dir_all(path).await } async fn remove_dir_contents(&self, path: &str) -> Result<()> { + let path = path_without_scheme(path); let mut entries = tokio::fs::read_dir(path).await?; while let Some(entry) = entries.next_entry().await? { if entry.file_type().await?.is_dir() { @@ -94,6 +97,7 @@ impl ObjectStore for LocalFileSystem { } async fn remove_file(&self, path: &str) -> Result<()> { + let path = path_without_scheme(path); let res = tokio::fs::remove_file(path).await; match res { Ok(()) => Ok(()), @@ -113,12 +117,14 @@ impl ObjectStore for LocalFileSystem { } async fn rename(&self, source: &str, dest: &str) -> Result<()> { + let source = path_without_scheme(source); + let dest = path_without_scheme(dest); tokio::fs::rename(source, dest).await } async fn copy(&self, source: &str, dest: &str) -> Result<()> { - let source_root = PathBuf::from(source); - let dest_root = PathBuf::from(dest); + let source_root = PathBuf::from(path_without_scheme(source)); + let dest_root = PathBuf::from(path_without_scheme(dest)); if !source_root.exists() { return Err(io::Error::new( From 67746c8bd2eebcdc0cc8de74822cc6b5beb2a945 Mon Sep 17 00:00:00 2001 From: Will Jones Date: Mon, 18 Apr 2022 21:12:01 -0700 Subject: [PATCH 10/14] format --- datafusion/core/tests/path_partition.rs | 28 ++++++++++++++++++++----- 1 file changed, 23 insertions(+), 5 deletions(-) diff --git a/datafusion/core/tests/path_partition.rs b/datafusion/core/tests/path_partition.rs index 9147b222a650..d87d02d8e844 100644 --- a/datafusion/core/tests/path_partition.rs +++ b/datafusion/core/tests/path_partition.rs @@ -533,11 +533,18 @@ impl ObjectStore for MirroringObjectStore { } } - fn file_writer(&self, _path: &str) -> datafusion_data_access::Result> { + fn file_writer( + &self, + _path: &str, + ) -> datafusion_data_access::Result> { unimplemented!(); } - async fn create_dir(&self, _path: &str, _recursive: bool) -> datafusion_data_access::Result<()> { + async fn create_dir( + &self, + _path: &str, + _recursive: bool, + ) -> datafusion_data_access::Result<()> { unimplemented!(); } @@ -545,7 +552,10 @@ impl ObjectStore for MirroringObjectStore { unimplemented!(); } - async fn remove_dir_contents(&self, _path: &str) -> datafusion_data_access::Result<()> { + async fn remove_dir_contents( + &self, + _path: &str, + ) -> datafusion_data_access::Result<()> { unimplemented!(); } @@ -553,11 +563,19 @@ impl ObjectStore for MirroringObjectStore { unimplemented!(); } - async fn rename(&self, _source: &str, _dest: &str) -> datafusion_data_access::Result<()> { + async fn rename( + &self, + _source: &str, + _dest: &str, + ) -> datafusion_data_access::Result<()> { unimplemented!(); } - async fn copy(&self, _source: &str, _dest: &str) -> datafusion_data_access::Result<()> { + async fn copy( + &self, + _source: &str, + _dest: &str, + ) -> datafusion_data_access::Result<()> { unimplemented!(); } } From c4dd1e9581ee3e30ae7bef545f2367822fe52565 Mon Sep 17 00:00:00 2001 From: Will Jones Date: Mon, 18 Apr 2022 22:06:24 -0700 Subject: [PATCH 11/14] Fix write issue --- data-access/src/object_store/local.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/data-access/src/object_store/local.rs b/data-access/src/object_store/local.rs index d990eab5efc7..f97a42fdfb02 100644 --- a/data-access/src/object_store/local.rs +++ b/data-access/src/object_store/local.rs @@ -490,7 +490,7 @@ mod tests { // rename replaces files let test_content = b"test"; let mut f = File::create(&a_path)?; - f.write(test_content)?; + f.write_all(test_content)?; f.flush()?; fs.rename(a_path.to_str().unwrap(), b_path.to_str().unwrap()) .await?; @@ -565,7 +565,7 @@ mod tests { // Copy replaces files let test_content = b"test"; let mut f = File::create(&a_path)?; - f.write(test_content)?; + f.write_all(test_content)?; f.flush()?; fs.copy(a_path.to_str().unwrap(), b_path.to_str().unwrap()) .await?; From e7437cc1517d39ff4dd26a9f44247f40e283150f Mon Sep 17 00:00:00 2001 From: Will Jones Date: Tue, 26 Apr 2022 21:07:39 -0700 Subject: [PATCH 12/14] Start to improve documentation --- data-access/Cargo.toml | 2 +- data-access/src/lib.rs | 10 +++++++ data-access/src/object_store/local.rs | 39 ++++++++++++++++++++++++++- data-access/src/object_store/mod.rs | 28 ++++++++++++++++--- 4 files changed, 74 insertions(+), 5 deletions(-) diff --git a/data-access/Cargo.toml b/data-access/Cargo.toml index bfac84917f4a..390732d69e6f 100644 --- a/data-access/Cargo.toml +++ b/data-access/Cargo.toml @@ -38,4 +38,4 @@ chrono = { version = "0.4", features = ["std"] } futures = "0.3" parking_lot = "0.12" tempfile = "3" -tokio = { version = "1.0", features = ["macros", "rt", "rt-multi-thread", "sync", "fs", "parking_lot"] } +tokio = { version = "1.0", features = ["macros", "rt", "rt-multi-thread", "sync", "fs", "parking_lot", "io-util"] } diff --git a/data-access/src/lib.rs b/data-access/src/lib.rs index 5da690ad130b..839d4b43e5d3 100644 --- a/data-access/src/lib.rs +++ b/data-access/src/lib.rs @@ -15,6 +15,16 @@ // specific language governing permissions and limitations // under the License. +//! Datafusion Data Access provides general traits and structs +//! for data access abstractions. The main trait is the +//! [object_store::ObjectStore] traits, which defines a filesystem-like +//! interfaces that can be implemented by a variety of systems. This +//! create provides local filesystem implementation for ObjectStore +//! as [object_store::local::LocalFileSystem]. +//! +//! Alternative object stores are provided by separate crates, including: +//! * [S3FileSystem](https://docs.rs/datafusion-objectstore-s3/latest/datafusion_objectstore_s3/) + pub mod object_store; use chrono::{DateTime, Utc}; diff --git a/data-access/src/object_store/local.rs b/data-access/src/object_store/local.rs index f97a42fdfb02..e6c52704c209 100644 --- a/data-access/src/object_store/local.rs +++ b/data-access/src/object_store/local.rs @@ -15,7 +15,44 @@ // specific language governing permissions and limitations // under the License. -//! Object store that represents the Local File System. +//! Provides object store implementation [LocalFileSystem], which wraps +//! file system operations in tokio. +//! +//! ``` +//! use tempfile::tempdir; +//! use tokio::io::{AsyncRead, AsyncWrite}; +//! use future::StreamExt; +//! use datafusion_data_access::object_store::ObjectStore; +//! use datafusion_data_access::object_store::local::LocalFileSystem; +//! +//! async { +//! let fs = LocalFileSystem; +//! let tmp_dir = tempdir()?.path(); +//! let dir_path = tmp_dir.join("x"); +//! let file_path = tmp_dir.join("a.txt"); +//! +//! // Create dir +//! fs.create_dir(dir_path.to_str().unwrap(), true).await?; +//! +//! // Write a file +//! let writer = fs.file_writer(file_path.to_str().unwrap())?.writer().await?; +//! writer.write_all("test").await?; +//! writer.shutdown(); +//! +//! // List files +//! let files = fs.list_file(dir_path.to_str().unwrap()).await?; +//! let Some(file) = files.next(); +//! assert_eq!(file.path, file_path.to_str().unwrap()); +//! +//! // Read data back +//! let reader = fs.file_reader(file)?.chunk_reader().await?; +//! let data = reader.read_all().await?; +//! assert_eq!(data, "test"); +//! +//! // Clear dir +//! fs.remove_dir_all(dir_path.to_str().unwrap()).await?; +//! }; +//! ``` use std::fs::{self, File, Metadata}; use std::io::{self, BufReader, Read, Seek, SeekFrom, Write}; diff --git a/data-access/src/object_store/mod.rs b/data-access/src/object_store/mod.rs index 0db3bca88272..88024bd0d27d 100644 --- a/data-access/src/object_store/mod.rs +++ b/data-access/src/object_store/mod.rs @@ -101,7 +101,15 @@ pub trait ObjectStore: Sync + Send + Debug { } /// Returns all the files in `prefix` if the `prefix` is already a leaf dir, - /// or all paths between the `prefix` and the first occurrence of the `delimiter` if it is provided. + /// or all paths between the `prefix` and the first occurrence of the `delimiter` + /// if it is provided. + /// + /// # Arguments + /// + /// * `prefix` - + /// * `delimiter` - + /// + /// # Example async fn list_dir( &self, prefix: &str, @@ -116,7 +124,10 @@ pub trait ObjectStore: Sync + Send + Debug { /// Create directory, recursively if requested /// - /// If directory already exists, will return Ok + /// If directory already exists, will return Ok. + /// In many cases, object stores don't have a notion of directories, so this + /// might do nothing except check that a file with the same path doesn't + /// already exist. async fn create_dir(&self, path: &str, recursive: bool) -> Result<()>; /// Delete directory and its contents, recursively @@ -138,7 +149,18 @@ pub trait ObjectStore: Sync + Send + Debug { /// If dest exists, source will replace it unless dest is a non-empty directory, /// in which case an [std::io::ErrorKind::AlreadyExists] or /// [std::io::ErrorKind::DirectoryNotEmpty] error will be returned - async fn rename(&self, source: &str, dest: &str) -> Result<()>; + /// + /// In many implementations, this will simply perform a copy and then delete + /// source. + async fn rename(&self, source: &str, dest: &str) -> Result<()> { + self.copy(source, dest).await?; + // if is_file { + // self.remove_file(source).await + // } else { + // self.remove_dir_all(source).await + // } + todo!(); + } /// Copy a file or directory /// From 531f757a187dae98212d6faccd7df2b069e37b29 Mon Sep 17 00:00:00 2001 From: Will Jones Date: Tue, 26 Apr 2022 21:08:18 -0700 Subject: [PATCH 13/14] Scaffold a test suite that could be reused by implementors --- data-access/src/object_store/local.rs | 8 +++-- data-access/src/object_store/mod.rs | 52 ++++++++++++++++++++++++++- 2 files changed, 57 insertions(+), 3 deletions(-) diff --git a/data-access/src/object_store/local.rs b/data-access/src/object_store/local.rs index e6c52704c209..5d7d94ab4401 100644 --- a/data-access/src/object_store/local.rs +++ b/data-access/src/object_store/local.rs @@ -69,12 +69,12 @@ use crate::{FileMeta, Result, SizedFile}; use super::{ path_without_scheme, FileMetaStream, ListEntryStream, ObjectReader, - ObjectReaderStream, ObjectStore, ObjectWriter, + ObjectReaderStream, ObjectStore, ObjectWriter }; pub static LOCAL_SCHEME: &str = "file"; -#[derive(Debug)] +#[derive(Debug, Default)] /// Local File System as Object Store. pub struct LocalFileSystem; @@ -326,6 +326,7 @@ pub fn local_object_reader(file: String) -> Arc { .expect("File not found") } +// TODO: Why would a user want this function that unwraps? unless we said it's unsafe? /// Helper method to fetch the file size and date at given path and create a `FileMeta` pub fn local_unpartitioned_file(file: String) -> FileMeta { let metadata = fs::metadata(&file).expect("Local file metadata"); @@ -364,6 +365,7 @@ impl ObjectWriter for LocalFileWriter { #[cfg(test)] mod tests { use super::*; + use crate::test_object_store; use futures::StreamExt; use std::collections::HashSet; use std::fs::File; @@ -638,4 +640,6 @@ mod tests { Ok(()) } + + test_object_store!(LocalFileSystem::default, tempdir); } diff --git a/data-access/src/object_store/mod.rs b/data-access/src/object_store/mod.rs index 88024bd0d27d..f98b8cd565fc 100644 --- a/data-access/src/object_store/mod.rs +++ b/data-access/src/object_store/mod.rs @@ -135,13 +135,14 @@ pub trait ObjectStore: Sync + Send + Debug { /// Delete directory contents recursively /// - /// Unlike [delete_dir], will not delete directory itself + /// Unlike [delete_dir](#method.delete_dir), will not delete directory itself async fn remove_dir_contents(&self, path: &str) -> Result<()>; /// Delete a file /// /// If file does not exist, will return error kind [std::io::ErrorKind::NotFound] /// If attempted on a directory, will return error kind [std::io::ErrorKind::InvalidInput] + /// async fn remove_file(&self, path: &str) -> Result<()>; /// Rename a file or directory @@ -186,3 +187,52 @@ pub fn path_without_scheme(full_path: &str) -> &str { full_path } } + +pub mod testing { + use super::*; + + pub async fn list_dir_suite(store: impl ObjectStore, base_path: &str) -> Result<()> { + // base/a.txt + // base/x/b.txt + // base/y/c.txt + store.create_dir("x", false).await?; + store.create_dir("y", false).await?; + + // TODO + + Ok(()) + } + + pub async fn read_write_suite(store: impl ObjectStore, base_path: &str) -> Result<()> { + // TODO + Ok(()) + } + + #[macro_export] + macro_rules! store_test { + ($test_name:ident, $store_factory:expr, $base_path_factory:expr) => { + #[tokio::test] + async fn $test_name() -> Result<()> { + let object_store = $store_factory(); + let base_path = $base_path_factory()?; + let base_path_str = base_path.path().to_str().unwrap(); + crate::object_store::testing::$test_name(object_store, base_path_str).await + } + } + } + + /// Run the standard ObjectStore test suite + /// + /// + #[macro_export] + macro_rules! test_object_store { + ($store_factory:expr, $base_path_factory:expr) => { + mod object_store_test { + use super::*; + use crate::store_test; + store_test!(list_dir_suite, $store_factory, $base_path_factory); + store_test!(read_write_suite, $store_factory, $base_path_factory); + } + }; + } +} From 66e7da07be154986b9416aab1b038ead85533c23 Mon Sep 17 00:00:00 2001 From: Will Jones Date: Wed, 27 Apr 2022 21:08:16 -0700 Subject: [PATCH 14/14] Fix writer and start testing --- data-access/src/object_store/local.rs | 40 ++++++++++----- data-access/src/object_store/mod.rs | 74 +++++++++++++++++++++++---- 2 files changed, 91 insertions(+), 23 deletions(-) diff --git a/data-access/src/object_store/local.rs b/data-access/src/object_store/local.rs index 5d7d94ab4401..59c4c5f2cea4 100644 --- a/data-access/src/object_store/local.rs +++ b/data-access/src/object_store/local.rs @@ -15,22 +15,22 @@ // specific language governing permissions and limitations // under the License. -//! Provides object store implementation [LocalFileSystem], which wraps +//! Provides object store implementation [LocalFileSystem], which wraps //! file system operations in tokio. -//! +//! //! ``` //! use tempfile::tempdir; //! use tokio::io::{AsyncRead, AsyncWrite}; //! use future::StreamExt; //! use datafusion_data_access::object_store::ObjectStore; //! use datafusion_data_access::object_store::local::LocalFileSystem; -//! +//! //! async { //! let fs = LocalFileSystem; //! let tmp_dir = tempdir()?.path(); //! let dir_path = tmp_dir.join("x"); //! let file_path = tmp_dir.join("a.txt"); -//! +//! //! // Create dir //! fs.create_dir(dir_path.to_str().unwrap(), true).await?; //! @@ -38,23 +38,23 @@ //! let writer = fs.file_writer(file_path.to_str().unwrap())?.writer().await?; //! writer.write_all("test").await?; //! writer.shutdown(); -//! +//! //! // List files //! let files = fs.list_file(dir_path.to_str().unwrap()).await?; //! let Some(file) = files.next(); //! assert_eq!(file.path, file_path.to_str().unwrap()); -//! +//! //! // Read data back //! let reader = fs.file_reader(file)?.chunk_reader().await?; //! let data = reader.read_all().await?; //! assert_eq!(data, "test"); -//! +//! //! // Clear dir //! fs.remove_dir_all(dir_path.to_str().unwrap()).await?; //! }; //! ``` -use std::fs::{self, File, Metadata}; +use std::fs::{self, File, Metadata, OpenOptions}; use std::io::{self, BufReader, Read, Seek, SeekFrom, Write}; use std::path::PathBuf; use std::pin::Pin; @@ -63,13 +63,14 @@ use std::sync::Arc; use async_trait::async_trait; use chrono::{DateTime, Utc}; use futures::{stream, AsyncRead, StreamExt}; -use tokio::{fs::File as AsyncFile, io::AsyncWrite}; +use tokio::fs::OpenOptions as AsyncOpenOptions; +use tokio::io::AsyncWrite; use crate::{FileMeta, Result, SizedFile}; use super::{ path_without_scheme, FileMetaStream, ListEntryStream, ObjectReader, - ObjectReaderStream, ObjectStore, ObjectWriter + ObjectReaderStream, ObjectStore, ObjectWriter, }; pub static LOCAL_SCHEME: &str = "file"; @@ -352,13 +353,24 @@ impl LocalFileWriter { #[async_trait] impl ObjectWriter for LocalFileWriter { async fn writer(&self) -> Result>> { - let file = AsyncFile::open(&self.path).await?; - Ok(Box::pin(file)) + Ok(Box::pin( + AsyncOpenOptions::new() + .write(true) + .append(false) + .create(true) + .open(&self.path) + .await?, + )) } fn sync_writer(&self) -> Result> { - let file = File::open(&self.path)?; - Ok(Box::new(file)) + Ok(Box::new( + OpenOptions::new() + .write(true) + .append(false) + .create(true) + .open(&self.path)?, + )) } } diff --git a/data-access/src/object_store/mod.rs b/data-access/src/object_store/mod.rs index f98b8cd565fc..566334bdb4be 100644 --- a/data-access/src/object_store/mod.rs +++ b/data-access/src/object_store/mod.rs @@ -71,6 +71,8 @@ pub trait ObjectReader: Send + Sync { /// Object Writer for one file in an object store. #[async_trait] pub trait ObjectWriter: Send + Sync { + // TODO: Should async writer be Send + Sync as well? + // TODO: writer always overwrites not append (but we can add append_writer() later) async fn writer(&self) -> Result>>; fn sync_writer(&self) -> Result>; @@ -190,20 +192,70 @@ pub fn path_without_scheme(full_path: &str) -> &str { pub mod testing { use super::*; + use futures::stream::TryStreamExt; + use std::collections::HashSet; + use std::path::Path; + use tokio::io::AsyncWriteExt; + + pub async fn simple_write_file( + path: &Path, + content: &str, + store: &impl ObjectStore, + ) -> Result<()> { + dbg!(path.to_str().unwrap()); + let mut writer = store.file_writer(path.to_str().unwrap())?.writer().await?; + writer.write_all(content.as_bytes()).await?; + writer.flush().await?; + Ok(()) + } - pub async fn list_dir_suite(store: impl ObjectStore, base_path: &str) -> Result<()> { + pub async fn list_dir_suite(store: impl ObjectStore, base_path: &Path) -> Result<()> { // base/a.txt // base/x/b.txt // base/y/c.txt - store.create_dir("x", false).await?; - store.create_dir("y", false).await?; - - // TODO + store.create_dir(base_path.to_str().unwrap(), false).await?; + store + .create_dir(base_path.join("x").to_str().unwrap(), false) + .await?; + store + .create_dir(base_path.join("y").to_str().unwrap(), false) + .await?; + simple_write_file(&base_path.join("a.txt"), "test", &store).await?; + simple_write_file(&base_path.join("x").join("b.txt"), "", &store).await?; + simple_write_file(&base_path.join("y").join("c.txt"), "", &store).await?; + + // list_file() will return all files recursively, but no directories. + let files = store.list_file(base_path.to_str().unwrap()).await?; + let expected = HashSet::from([ + base_path + .join("a.txt") + .into_os_string() + .into_string() + .unwrap(), + base_path + .join("x") + .join("b.txt") + .into_os_string() + .into_string() + .unwrap(), + base_path + .join("y") + .join("c.txt") + .into_os_string() + .into_string() + .unwrap(), + ]); + let actual: HashSet = + files.map_ok(|f| f.sized_file.path).try_collect().await?; + assert_eq!(expected, actual); Ok(()) } - pub async fn read_write_suite(store: impl ObjectStore, base_path: &str) -> Result<()> { + pub async fn read_write_suite( + store: impl ObjectStore, + base_path: &Path, + ) -> Result<()> { // TODO Ok(()) } @@ -215,10 +267,13 @@ pub mod testing { async fn $test_name() -> Result<()> { let object_store = $store_factory(); let base_path = $base_path_factory()?; - let base_path_str = base_path.path().to_str().unwrap(); - crate::object_store::testing::$test_name(object_store, base_path_str).await + dbg!(&base_path); + crate::object_store::testing::$test_name(object_store, base_path.path()) + .await?; + dbg!(&base_path); + Ok(()) } - } + }; } /// Run the standard ObjectStore test suite @@ -230,6 +285,7 @@ pub mod testing { mod object_store_test { use super::*; use crate::store_test; + use crate::Result; store_test!(list_dir_suite, $store_factory, $base_path_factory); store_test!(read_write_suite, $store_factory, $base_path_factory); }