Skip to content
This repository has been archived by the owner on Jul 27, 2022. It is now read-only.

Commit

Permalink
fix!: Use from and to so we avoid conflict with 'source'
Browse files Browse the repository at this point in the history
  • Loading branch information
wjones127 committed May 22, 2022
1 parent 06f4114 commit 4c097b8
Show file tree
Hide file tree
Showing 7 changed files with 75 additions and 88 deletions.
5 changes: 2 additions & 3 deletions build.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,9 @@ mod platform_cfg {
if ver.major >= 2 && ver.minor >= 28 {
println!("cargo:rustc-cfg=glibc_renameat2");
} else {
let more_info = "https://docs.rs/deltalake/latest/deltalake/storage/file/struct.FileStorageBackend.html";
println!(
"cargo:warning=glibc version >= 2.28 is required for performing commits to local file system, glibc version found {}.{}. For more information: {}",
ver.major, ver.minor, more_info
"cargo:warning=glibc version >= 2.28 is required for rename_no_replace on local file system, glibc version found {}.{}.",
ver.major, ver.minor
);
}
}
Expand Down
26 changes: 13 additions & 13 deletions src/aws.rs
Original file line number Diff line number Diff line change
Expand Up @@ -109,17 +109,17 @@ enum Error {
},

#[snafu(display(
"Unable to copy object. Bucket: {}, Source: {}, Dest: {}, Error: {}",
"Unable to copy object. Bucket: {}, From: {}, To: {}, Error: {}",
bucket,
src,
dest,
from,
to,
source,
))]
UnableToCopyObject {
source: rusoto_core::RusotoError<rusoto_s3::CopyObjectError>,
bucket: String,
src: String,
dest: String,
from: String,
to: String,
},

#[snafu(display(
Expand Down Expand Up @@ -418,15 +418,15 @@ impl ObjectStore for AmazonS3 {
.await?)
}

async fn copy(&self, source: &Path, dest: &Path) -> Result<()> {
let source = source.to_raw();
let dest = dest.to_raw();
async fn copy(&self, from: &Path, to: &Path) -> Result<()> {
let from = from.to_raw();
let to = to.to_raw();
let bucket_name = self.bucket_name.clone();

let request_factory = move || rusoto_s3::CopyObjectRequest {
bucket: bucket_name.clone(),
copy_source: format!("{}/{}", &bucket_name, source),
key: dest.to_string(),
copy_source: format!("{}/{}", &bucket_name, from),
key: to.to_string(),
..Default::default()
};

Expand All @@ -440,8 +440,8 @@ impl ObjectStore for AmazonS3 {
.await
.context(UnableToCopyObjectSnafu {
bucket: &self.bucket_name,
src: source,
dest,
from,
to,
})?;

Ok(())
Expand Down Expand Up @@ -768,7 +768,7 @@ mod tests {
use crate::{
tests::{
get_nonexistent_object, list_uses_directories_correctly, list_with_delimiter,
put_get_delete_list, rename_and_copy
put_get_delete_list, rename_and_copy,
},
Error as ObjectStoreError, ObjectStore,
};
Expand Down
44 changes: 15 additions & 29 deletions src/gcp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,31 +78,17 @@ enum Error {
},

#[snafu(display(
"Unable to copy object. Bucket: {}, Source: {}, Dest: {}, Error: {}",
"Unable to copy object. Bucket: {}, From: {}, To: {}, Error: {}",
bucket,
src,
dest,
from,
to,
source,
))]
UnableToCopyObject {
source: cloud_storage::Error,
bucket: String,
src: String,
dest: String,
},

#[snafu(display(
"Unable to rename object. Bucket: {}, Source: {}, Dest: {}, Error: {}",
bucket,
src,
dest,
source,
))]
UnableToRenameObject {
source: cloud_storage::Error,
bucket: String,
src: String,
dest: String,
from: String,
to: String,
},

NotFound {
Expand Down Expand Up @@ -305,39 +291,39 @@ impl ObjectStore for GoogleCloudStorage {
Ok(result)
}

async fn copy(&self, source: &Path, dest: &Path) -> Result<()> {
let source = source.to_raw();
let dest = dest.to_raw();
async fn copy(&self, from: &Path, to: &Path) -> Result<()> {
let from = from.to_raw();
let to = to.to_raw();
let bucket_name = self.bucket_name.clone();

let source_obj = self
.client
.object()
.read(&bucket_name, source)
.read(&bucket_name, from)
.await
.map_err(|e| match e {
cloud_storage::Error::Google(ref error) if error.error.code == 404 => {
Error::NotFound {
path: source.to_string(),
path: from.to_string(),
source: e,
}
}
_ => Error::UnableToCopyObject {
bucket: self.bucket_name.clone(),
src: source.to_string(),
dest: dest.to_string(),
from: from.to_string(),
to: to.to_string(),
source: e,
},
})?;
// TODO: Handle different buckets?
self.client
.object()
.copy(&source_obj, &bucket_name, dest)
.copy(&source_obj, &bucket_name, to)
.await
.context(UnableToCopyObjectSnafu {
bucket: self.bucket_name.clone(),
src: source.to_string(),
dest: dest.to_string(),
from: from.to_string(),
to: to.to_string(),
})?;

Ok(())
Expand Down
10 changes: 5 additions & 5 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,24 +76,24 @@ pub trait ObjectStore: std::fmt::Display + Send + Sync + Debug + 'static {
/// Copy an object from one path to another in the same object store.
///
/// If there exists an object at the destination, it will be overwritten.
async fn copy(&self, source: &Path, dest: &Path) -> Result<()>;
async fn copy(&self, from: &Path, to: &Path) -> Result<()>;

/// Move an object from one path to another in the same object store.
///
/// By default, this is implemented as a copy and then delete source. It may not
/// check when deleting source that it was the same object that was originally copied.
///
/// If there exists an object at the destination, it will be overwritten.
async fn rename(&self, source: &Path, dest: &Path) -> Result<()> {
self.copy(source, dest).await?;
self.delete(source).await?;
async fn rename(&self, from: &Path, to: &Path) -> Result<()> {
self.copy(from, to).await?;
self.delete(from).await?;
Ok(())
}

/// Move an object from one path to another, only if destination is empty.
///
/// Will return an error if the destination already has an object.
async fn rename_no_replace(&self, source: &Path, dest: &Path) -> Result<()>;
async fn rename_no_replace(&self, from: &Path, to: &Path) -> Result<()>;
}

/// Result of a list call that includes objects, prefixes (directories) and a
Expand Down
46 changes: 23 additions & 23 deletions src/local.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,17 +68,17 @@ pub(crate) enum Error {
path: std::path::PathBuf,
},

#[snafu(display("Unable to copy file from {} to {}: {}", src.display(), dest.display(), source))]
#[snafu(display("Unable to copy file from {} to {}: {}", from.display(), to.display(), source))]
UnableToCopyFile {
src: std::path::PathBuf,
dest: std::path::PathBuf,
from: std::path::PathBuf,
to: std::path::PathBuf,
source: io::Error,
},

#[snafu(display("Unable to rename file from {} to {}: {}", src.display(), dest.display(), source))]
#[snafu(display("Unable to rename file from {} to {}: {}", from.display(), to.display(), source))]
UnableToRenameFile {
src: std::path::PathBuf,
dest: std::path::PathBuf,
from: std::path::PathBuf,
to: std::path::PathBuf,
source: io::Error,
},

Expand Down Expand Up @@ -252,38 +252,38 @@ impl ObjectStore for LocalFileSystem {
})
}

async fn copy(&self, source: &Path, dest: &Path) -> Result<()> {
let source = self.path_to_filesystem(source);
let dest = self.path_to_filesystem(dest);
tokio::fs::copy(&source, &dest)
async fn copy(&self, from: &Path, to: &Path) -> Result<()> {
let from = self.path_to_filesystem(from);
let to = self.path_to_filesystem(to);
tokio::fs::copy(&from, &to)
.await
.context(UnableToCopyFileSnafu { src: source, dest })?;
.context(UnableToCopyFileSnafu { from, to })?;
Ok(())
}

async fn rename(&self, source: &Path, dest: &Path) -> Result<()> {
let source = self.path_to_filesystem(source);
let dest = self.path_to_filesystem(dest);
tokio::fs::rename(&source, &dest)
async fn rename(&self, from: &Path, to: &Path) -> Result<()> {
let from = self.path_to_filesystem(from);
let to = self.path_to_filesystem(to);
tokio::fs::rename(&from, &to)
.await
.context(UnableToCopyFileSnafu { src: source, dest })?;
.context(UnableToCopyFileSnafu { from, to })?;
Ok(())
}

async fn rename_no_replace(&self, source: &Path, dest: &Path) -> Result<()> {
let source = self.path_to_filesystem(source);
let dest = self.path_to_filesystem(dest);
imp::rename_no_replace(&source, &dest)
async fn rename_no_replace(&self, from: &Path, to: &Path) -> Result<()> {
let from = self.path_to_filesystem(from);
let to = self.path_to_filesystem(to);
imp::rename_no_replace(&from, &to)
.await
.map_err(|err| match err.kind() {
io::ErrorKind::AlreadyExists => Error::AlreadyExists {
path: dest.to_str().unwrap().to_string(),
path: to.to_str().unwrap().to_string(),
source: err,
}
.into(),
_ => Error::UnableToRenameFile {
src: source,
dest,
from,
to,
source: err,
}
.into(),
Expand Down
21 changes: 11 additions & 10 deletions src/memory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -140,23 +140,23 @@ impl ObjectStore for InMemory {
})
}

async fn copy(&self, source: &Path, dest: &Path) -> Result<()> {
let data = self.get_bytes(source).await?;
self.storage.write().await.insert(dest.clone(), data);
async fn copy(&self, from: &Path, to: &Path) -> Result<()> {
let data = self.get_bytes(from).await?;
self.storage.write().await.insert(to.clone(), data);
Ok(())
}

async fn rename_no_replace(&self, source: &Path, dest: &Path) -> Result<()> {
let data = self.get_bytes(source).await?;
async fn rename_no_replace(&self, from: &Path, to: &Path) -> Result<()> {
let data = self.get_bytes(from).await?;
let mut storage = self.storage.write().await;
if storage.contains_key(dest) {
if storage.contains_key(to) {
return Err(Error::AlreadyExists {
path: dest.to_string(),
path: to.to_string(),
}
.into());
}
storage.insert(dest.clone(), data);
storage.remove(source);
storage.insert(to.clone(), data);
storage.remove(from);
Ok(())
}
}
Expand Down Expand Up @@ -196,7 +196,7 @@ mod tests {
use crate::{
tests::{
get_nonexistent_object, list_uses_directories_correctly, list_with_delimiter,
put_get_delete_list, rename_and_copy,
put_get_delete_list, rename_and_copy, rename_no_replace,
},
Error as ObjectStoreError, ObjectStore,
};
Expand All @@ -209,6 +209,7 @@ mod tests {
list_uses_directories_correctly(&integration).await.unwrap();
list_with_delimiter(&integration).await.unwrap();
rename_and_copy(&integration).await.unwrap();
rename_no_replace(&integration).await.unwrap();
}

#[tokio::test]
Expand Down
11 changes: 6 additions & 5 deletions src/throttle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -206,16 +206,16 @@ impl<T: ObjectStore> ObjectStore for ThrottledStore<T> {
}
}

async fn copy(&self, source: &Path, dest: &Path) -> Result<()> {
async fn copy(&self, from: &Path, to: &Path) -> Result<()> {
sleep(self.config().wait_delete_per_call).await;

self.inner.copy(source, dest).await
self.inner.copy(from, to).await
}

async fn rename_no_replace(&self, source: &Path, dest: &Path) -> Result<()> {
async fn rename_no_replace(&self, from: &Path, to: &Path) -> Result<()> {
sleep(self.config().wait_delete_per_call).await;

self.inner.rename_no_replace(source, dest).await
self.inner.rename_no_replace(from, to).await
}
}

Expand All @@ -231,7 +231,7 @@ mod tests {
memory::InMemory,
tests::{
list_uses_directories_correctly, list_with_delimiter, put_get_delete_list,
rename_and_copy,
rename_and_copy, rename_no_replace,
},
};
use bytes::Bytes;
Expand Down Expand Up @@ -264,6 +264,7 @@ mod tests {
list_uses_directories_correctly(&store).await.unwrap();
list_with_delimiter(&store).await.unwrap();
rename_and_copy(&store).await.unwrap();
rename_no_replace(&integration).await.unwrap();
}

#[tokio::test]
Expand Down

0 comments on commit 4c097b8

Please sign in to comment.