diff --git a/Cargo.toml b/Cargo.toml index 3392bd9eb..43fd0bb8e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -23,13 +23,16 @@ rust-version.workspace = true [features] default = ["auth_jwt", "cli", "with-db"] - auth_jwt = ["dep:jsonwebtoken"] cli = ["dep:clap"] testing = ["dep:axum-test"] with-db = ["dep:sea-orm", "dep:sea-orm-migration"] channels = ["dep:socketioxide"] - +# Storage features +all_storage = ["storage_aws_s3", "storage_azure", "storage_gcp"] +storage_aws_s3 = ["object_store/aws"] +storage_azure = ["object_store/azure"] +storage_gcp = ["object_store/gcp"] [dependencies] @@ -102,12 +105,11 @@ bcrypt = { version = "0.15.0", optional = true } validator = { version = "0.16.1", features = ["derive"] } futures-util = "0.3" tower = "0.4" -hyper = "1" +hyper = "1.1" mime = "0.3" bytes = "1.1" -axum-test = { version = "14.0.0-rc.1", optional = true } - +axum-test = { version = "14.3.0", optional = true } # gen rrgen = "0.5.3" @@ -119,8 +121,13 @@ cfg-if = "1" uuid = { version = "1.6", features = ["v4"] } requestty = "0.5.0" +# A socket.io server implementation socketioxide = { version = "0.10.0", features = ["state"], optional = true } + +# File Upload +object_store = { version = "0.9.0", default-features = false } + [workspace.dependencies] async-trait = { version = "0.1.74" } axum = { version = "0.7.1", features = ["macros"] } @@ -137,7 +144,6 @@ features = [ "sqlx-sqlite", ] - [dev-dependencies] rstest = "0.18.2" insta = { version = "1.34.0", features = ["redactions", "yaml", "filters"] } diff --git a/docs-site/content/docs/testing/storage.md b/docs-site/content/docs/testing/storage.md new file mode 100644 index 000000000..d55564b6d --- /dev/null +++ b/docs-site/content/docs/testing/storage.md @@ -0,0 +1,41 @@ ++++ +title = "Storage" +description = "" +date = 2021-05-01T18:20:00+00:00 +updated = 2021-05-01T18:20:00+00:00 +draft = false +weight = 23 +sort_by = "weight" +template = "docs/page.html" + +[extra] +lead = "" +toc = true +top = false ++++ + +By testing file storage in your controller you can follow this example: +```rust +#[tokio::test] +#[serial] +async fn can_register() { + testing::request::(|request, ctx| async move { + let file_content = "loco file upload"; + let file_part = Part::bytes(file_content.as_bytes()).file_name("loco.txt"); + + let multipart_form = MultipartForm::new().add_part("file", file_part); + + let response = request.post("/upload/file").multipart(multipart_form).await; + + response.assert_status_ok(); + + let res: views::upload::Response = serde_json::from_str(&response.text()).unwrap(); + + let stored_file: String = ctx.storage.unwrap().download(&res.path).await.unwrap(); + + assert_eq!(stored_file, file_content); + }) + .await; +} +``` + diff --git a/docs-site/content/docs/the-app/storage.md b/docs-site/content/docs/the-app/storage.md new file mode 100644 index 000000000..324bb667f --- /dev/null +++ b/docs-site/content/docs/the-app/storage.md @@ -0,0 +1,193 @@ ++++ +title = "Storage" +description = "" +date = 2024-02-07T08:00:00+00:00 +updated = 2024-02-07T08:00:00+00:00 +draft = false +weight = 21 +sort_by = "weight" +template = "docs/page.html" + +[extra] +lead = "" +toc = true +top = false ++++ + +In Loco Storage, we facilitate working with files through multiple operations. Storage can be in-memory, on disk, or use cloud services such as AWS S3, GCP, and Azure. + +Loco supports simple storage operations and advanced features like mirroring data or backup strategies with different failure modes. + +By default, in-memory and disk storage come out of the box. To work with cloud providers, you should specify the following features: +- `storage_aws_s3` +- `storage_azure` +- `storage_gcp` +- `all_storage` + +## Setup + +Add the `storage` function as a Hook in the `app.rs` file and import the `storage` module from `loco_rs`. + +```rust +use loco_rs::storage; + +impl Hooks for App { + async fn storage(_config: &Config, environment: &Environment) -> Result> { + return Ok(None); + } +} +``` + +This hook returns a Storage instance that holds all storage configurations, covered in the next sections. This Storage instance is stored as part of the application context and is available in controllers, endpoints, task workers, and more. + +## Glossary +| | | +| - | - | +| `driver` | Trait implementation for diverse storage operations. | +| `Storage`| Abstraction implementation for managing one or more storage operations. | +| `Strategy`| Trait implementing various strategies for operations, such as mirror or backup. | +| `FailureMode`| Implemented within each strategy, determining how to handle operations in case of failures. | + +### Initialize Storage + +Storage can be configured with a single driver or multiple drivers. + +#### Single Store + +In this example, we initialize the in-memory driver and create a new storage with the single function. + +```rust +use loco_rs::storage; +async fn storage( + _config: &Config, + environment: &Environment, + ) -> Result> { + let storage = Storage::single(storage::drivers::mem::new()); + return Ok(Some(storage)); + } +``` + +### Multiple Drivers + +For advanced usage, you can set up multiple drivers and apply smart strategies that come out of the box. Each strategy has its own set of failure modes that you can decide how to handle. + +Creating multiple drivers: + +```rust +use crate::storage::{drivers, Storage}; + +let aws_1 = drivers::aws::new("users"); +let azure = drivers::azure::new("users"); +let aws_2 = drivers::aws::new("users-mirror"); +``` + +#### Mirror Strategy: +You can keep multiple services in sync by defining a mirror service. A mirror service **replicates** uploads, deletes, rename and copy across two or more subordinate services. The download behavior redundantly retrieves data, meaning if the file retrieval fails from the primary, the first file found in the secondaries is returned. + +#### Behiver + +After creating the three store instances, we need to create the mirror strategy instance and define the failure mode. The mirror strategy expects the primary store and a list of secondary stores, along with failure mode options: +- `MirrorAll`: All secondary storages must succeed. If one fails, the operation continues to the rest but returns an error. +- `AllowMirrorFailure`: The operation does not return an error when one or more mirror operations fail. + +The failure mode is relevant for upload, delete, move, and copy. + +Example: +```rust + +// Define the mirror strategy by setting the primary store and secondary stores by names. +let strategy = Box::new(MirrorStrategy::new( + "store_1", + Some(vec!["store_2".to_string(), "store_3".to_string()]), + FailureMode::MirrorAll, +)) as Box; + +// Create the storage with the store mapping and the strategy. + let storage = Storage::new( + BTreeMap::from([ + ("store_1".to_string(), aws_1.clone()), + ("store_2".to_string(), azure.clone()), + ("store_3".to_string(), aws_2.clone()), + ]), + strategy.into(), +); +``` + +### Backup Strategy: + +You can back up your operations across multiple storages and control the failure mode policy. + +After creating the three store instances, we need to create the backup strategy instance and define the failure mode. The backup strategy expects the primary store and a list of secondary stores, along with failure mode options: +- `BackupAll`: All secondary storages must succeed. If one fails, the operation continues to the rest but returns an error. +- `AllowBackupFailure`: The operation does not return an error when one or more backup operations fail. +- `AtLeastOneFailure`: At least one operation should pass. +- `CountFailure`: The given number of backups should pass. + +The failure mode is relevant for upload, delete, move, and copy. The download always retrieves the file from the primary. + +Example: +```rust + +// Define the backup strategy by setting the primary store and secondary stores by names. +let strategy: Box = Box::new(BackupStrategy::new( + "store_1", + Some(vec!["store_2".to_string(), "store_3".to_string()]), + FailureMode::AllowBackupFailure, +)) as Box; + +let storage = Storage::new( + BTreeMap::from([ + ("store_1".to_string(), store_1.clone()), + ("store_2".to_string(), store_2.clone()), + ("store_3".to_string(), store_3.clone()), + ]), + strategy.into(), +); +``` + +## Create Your Own Strategy + +In case you have a specific strategy, you can easily create it by implementing the StorageStrategyTrait and implementing all store functionality. + +## Usage In Controller + +Follow this example, make sure you enable `multipart` feature in axum crate. + +```rust +async fn upload_file( + State(ctx): State, + mut multipart: Multipart, +) -> Result> { + let mut file = None; + while let Some(field) = multipart.next_field().await.map_err(|err| { + tracing::error!(error = ?err,"could not readd multipart"); + Error::BadRequest("could not readd multipart".into()) + })? { + let file_name = match field.file_name() { + Some(file_name) => file_name.to_string(), + _ => return Err(Error::BadRequest("file name not found".into())), + }; + + let content = field.bytes().await.map_err(|err| { + tracing::error!(error = ?err,"could not readd bytes"); + Error::BadRequest("could not readd bytes".into()) + })?; + + let path = PathBuf::from("folder").join(file_name); + ctx.storage + .as_ref() + .unwrap() + .upload(path.as_path(), &content) + .await?; + + file = Some(path); + } + + file.map_or_else(not_found, |path| { + format::json(views::upload::Response::new(path.as_path())) + }) +} +``` +## Testing + +Test storage in controller implementation, refer to the [documentation here](@/docs/testing/storage.md) \ No newline at end of file diff --git a/examples/demo/Cargo.lock b/examples/demo/Cargo.lock index c32151feb..3ff9f3ca5 100644 --- a/examples/demo/Cargo.lock +++ b/examples/demo/Cargo.lock @@ -390,7 +390,7 @@ checksum = "16e62a023e7c117e27523144c5d2459f4397fcc3cab0085af8e2224f643a0193" dependencies = [ "proc-macro2", "quote", - "syn 2.0.39", + "syn 2.0.48", ] [[package]] @@ -401,13 +401,13 @@ checksum = "b4eb2cdb97421e01129ccb49169d8279ed21e829929144f4a22a6e54ac549ca1" [[package]] name = "async-trait" -version = "0.1.74" +version = "0.1.77" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a66537f1bb974b254c98ed142ff995236e81b9d0fe4db0575f46612cb15eb0f9" +checksum = "c980ee35e870bd1a4d2c8294d4c04d0499e67bca1e4b5cefcc693c2fa00caea9" dependencies = [ "proc-macro2", "quote", - "syn 2.0.39", + "syn 2.0.48", ] [[package]] @@ -469,7 +469,7 @@ dependencies = [ "axum-macros", "bytes", "futures-util", - "http", + "http 1.0.0", "http-body", "http-body-util", "hyper", @@ -478,6 +478,7 @@ dependencies = [ "matchit", "memchr", "mime", + "multer", "percent-encoding", "pin-project-lite", "rustversion", @@ -501,7 +502,7 @@ dependencies = [ "async-trait", "bytes", "futures-util", - "http", + "http 1.0.0", "http-body", "http-body-util", "mime", @@ -523,7 +524,7 @@ dependencies = [ "bytes", "cookie", "futures-util", - "http", + "http 1.0.0", "http-body", "http-body-util", "mime", @@ -543,14 +544,14 @@ dependencies = [ "heck", "proc-macro2", "quote", - "syn 2.0.39", + "syn 2.0.48", ] [[package]] name = "axum-test" -version = "14.0.0" +version = "14.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "56ac99db40006a1a3fffeb381f2a78cb341dbc99d07b561e8bd119e22a2b1b0f" +checksum = "fc431b62ab307c833af24700936485eb5f9a8ac18a19347fe37dd4f7ae3dffe9" dependencies = [ "anyhow", "async-trait", @@ -558,12 +559,14 @@ dependencies = [ "axum", "bytes", "cookie", - "http", + "http 1.0.0", "http-body-util", "hyper", "hyper-util", + "mime", "pretty_assertions", "reserve-port", + "rust-multipart-rfc7578_2", "serde", "serde_json", "serde_urlencoded", @@ -588,7 +591,7 @@ dependencies = [ "cookie", "dashmap", "futures", - "http", + "http 1.0.0", "http-body", "rand", "serde", @@ -706,6 +709,7 @@ version = "0.1.0" dependencies = [ "async-trait", "axum", + "axum-test", "axum_session", "chrono", "eyre", @@ -775,7 +779,7 @@ dependencies = [ "proc-macro-crate", "proc-macro2", "quote", - "syn 2.0.39", + "syn 2.0.48", "syn_derive", ] @@ -1008,7 +1012,7 @@ dependencies = [ "heck", "proc-macro2", "quote", - "syn 2.0.39", + "syn 2.0.48", ] [[package]] @@ -1345,7 +1349,7 @@ checksum = "487585f4d0c6655fe74905e2504d8ad6908e4db67f744eb140876906c2f3175d" dependencies = [ "proc-macro2", "quote", - "syn 2.0.39", + "syn 2.0.48", ] [[package]] @@ -1409,6 +1413,15 @@ version = "0.3.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a357d28ed41a50f9c765dbfe56cbc04a64e53e5fc58ba79fbc34c10ef3df831f" +[[package]] +name = "encoding_rs" +version = "0.8.33" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7268b386296a025e474d5140678f75d6de9493ae55a5d709eeb9dd08149945e1" +dependencies = [ + "cfg-if", +] + [[package]] name = "equivalent" version = "1.0.1" @@ -1745,7 +1758,7 @@ checksum = "53b153fd91e4b0147f4aced87be237c98248656bb01050b96bf3ee89220a8ddb" dependencies = [ "proc-macro2", "quote", - "syn 2.0.39", + "syn 2.0.48", ] [[package]] @@ -1886,7 +1899,7 @@ dependencies = [ "futures-core", "futures-sink", "futures-util", - "http", + "http 1.0.0", "indexmap", "slab", "tokio", @@ -1990,6 +2003,17 @@ dependencies = [ "winapi", ] +[[package]] +name = "http" +version = "0.2.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8947b1a6fad4393052c7ba1f4cd97bed3e953a95c79c92ad9b051a04611d9fbb" +dependencies = [ + "bytes", + "fnv", + "itoa", +] + [[package]] name = "http" version = "1.0.0" @@ -2008,7 +2032,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1cac85db508abc24a2e48553ba12a996e87244a0395ce011e62b37158745d643" dependencies = [ "bytes", - "http", + "http 1.0.0", ] [[package]] @@ -2019,7 +2043,7 @@ checksum = "41cb79eb393015dadd30fc252023adb0b2400a0caee0fa2a077e6e21a551e840" dependencies = [ "bytes", "futures-util", - "http", + "http 1.0.0", "http-body", "pin-project-lite", ] @@ -2069,15 +2093,15 @@ dependencies = [ [[package]] name = "hyper" -version = "1.0.1" +version = "1.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "403f9214f3e703236b221f1a9cd88ec8b4adfa5296de01ab96216361f4692f56" +checksum = "fb5aa53871fc917b1a9ed87b683a5d86db645e23acb32c2e0785a353e522fb75" dependencies = [ "bytes", "futures-channel", "futures-util", "h2", - "http", + "http 1.0.0", "http-body", "httparse", "httpdate", @@ -2096,7 +2120,7 @@ dependencies = [ "bytes", "futures-channel", "futures-util", - "http", + "http 1.0.0", "http-body", "hyper", "pin-project-lite", @@ -2215,7 +2239,7 @@ checksum = "ce243b1bfa62ffc028f1cc3b6034ec63d649f3031bc8a4fbbb004e1ac17d1f68" dependencies = [ "proc-macro2", "quote", - "syn 2.0.39", + "syn 2.0.48", ] [[package]] @@ -2303,6 +2327,15 @@ dependencies = [ "either", ] +[[package]] +name = "itertools" +version = "0.12.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ba291022dbbd398a455acf126c1e341954079855bc60dfdda641363bd6922569" +dependencies = [ + "either", +] + [[package]] name = "itoa" version = "1.0.9" @@ -2457,7 +2490,7 @@ name = "loco-macros" version = "0.1.0" dependencies = [ "quote", - "syn 2.0.39", + "syn 2.0.48", ] [[package]] @@ -2488,6 +2521,7 @@ dependencies = [ "lazy_static", "lettre", "mime", + "object_store", "rand", "regex", "requestty", @@ -2617,6 +2651,24 @@ dependencies = [ "windows-sys 0.48.0", ] +[[package]] +name = "multer" +version = "2.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "01acbdc23469fd8fe07ab135923371d5f5a422fbf9c522158677c8eb15bc51c2" +dependencies = [ + "bytes", + "encoding_rs", + "futures-util", + "http 0.2.11", + "httparse", + "log", + "memchr", + "mime", + "spin 0.9.8", + "version_check", +] + [[package]] name = "nix" version = "0.27.1" @@ -2741,6 +2793,27 @@ dependencies = [ "memchr", ] +[[package]] +name = "object_store" +version = "0.9.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d139f545f64630e2e3688fd9f81c470888ab01edeb72d13b4e86c566f1130000" +dependencies = [ + "async-trait", + "bytes", + "chrono", + "futures", + "humantime", + "itertools 0.12.1", + "parking_lot", + "percent-encoding", + "snafu", + "tokio", + "tracing", + "url", + "walkdir", +] + [[package]] name = "once_cell" version = "1.18.0" @@ -2793,7 +2866,7 @@ dependencies = [ "proc-macro-error", "proc-macro2", "quote", - "syn 2.0.39", + "syn 2.0.48", ] [[package]] @@ -2913,7 +2986,7 @@ dependencies = [ "pest_meta", "proc-macro2", "quote", - "syn 2.0.39", + "syn 2.0.48", ] [[package]] @@ -2982,7 +3055,7 @@ checksum = "4359fd9c9171ec6e8c62926d6faaf553a8dc3f64e1507e76da7911b4f6a04405" dependencies = [ "proc-macro2", "quote", - "syn 2.0.39", + "syn 2.0.48", ] [[package]] @@ -3140,9 +3213,9 @@ checksum = "dc375e1527247fe1a97d8b7156678dfe7c1af2fc075c9a4db3690ecd2a148068" [[package]] name = "proc-macro2" -version = "1.0.70" +version = "1.0.78" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "39278fbbf5fb4f646ce651690877f89d1c5811a3d4acb27700c1cb3cdb78fd3b" +checksum = "e2422ad645d89c99f8f3e6b88a9fdeca7fabeac836b1002371c4367c8f984aae" dependencies = [ "unicode-ident", ] @@ -3178,9 +3251,9 @@ dependencies = [ [[package]] name = "quote" -version = "1.0.33" +version = "1.0.35" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5267fca4496028628a95160fc423a33e8b2e6af8a5302579e322e4b520293cae" +checksum = "291ec9ab5efd934aaf503a6466c5d5251535d108ee747472c3977cc5acc868ef" dependencies = [ "proc-macro2", ] @@ -3497,10 +3570,26 @@ dependencies = [ "regex", "relative-path", "rustc_version", - "syn 2.0.39", + "syn 2.0.48", "unicode-ident", ] +[[package]] +name = "rust-multipart-rfc7578_2" +version = "0.6.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "03b748410c0afdef2ebbe3685a6a862e2ee937127cdaae623336a459451c8d57" +dependencies = [ + "bytes", + "futures-core", + "futures-util", + "http 0.2.11", + "mime", + "mime_guess", + "rand", + "thiserror", +] + [[package]] name = "rust_decimal" version = "1.33.1" @@ -3669,7 +3758,7 @@ dependencies = [ "proc-macro-error", "proc-macro2", "quote", - "syn 2.0.39", + "syn 2.0.48", ] [[package]] @@ -3727,7 +3816,7 @@ dependencies = [ "proc-macro2", "quote", "sea-bae", - "syn 2.0.39", + "syn 2.0.48", "unicode-ident", ] @@ -3791,7 +3880,7 @@ dependencies = [ "heck", "proc-macro2", "quote", - "syn 2.0.39", + "syn 2.0.48", "thiserror", ] @@ -3865,7 +3954,7 @@ checksum = "43576ca501357b9b071ac53cdc7da8ef0cbd9493d8df094cd821777ea6e894d3" dependencies = [ "proc-macro2", "quote", - "syn 2.0.39", + "syn 2.0.48", ] [[package]] @@ -3964,7 +4053,7 @@ checksum = "91d129178576168c589c9ec973feedf7d3126c01ac2bf08795109aa35b69fb8f" dependencies = [ "proc-macro2", "quote", - "syn 2.0.39", + "syn 2.0.48", ] [[package]] @@ -4250,7 +4339,7 @@ version = "0.2.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6b7b278788e7be4d0d29c0f39497a0eef3fba6bbc8e70d8bf7fde46edeaa9e85" dependencies = [ - "itertools", + "itertools 0.11.0", "nom", "unicode_categories", ] @@ -4535,9 +4624,9 @@ dependencies = [ [[package]] name = "syn" -version = "2.0.39" +version = "2.0.48" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "23e78b90f2fcf45d3e842032ce32e3f2d1545ba6636271dcbf24fa306d87be7a" +checksum = "0f3531638e407dfc0814761abb7c00a5b54992b849452a0646b7f65c9f770f3f" dependencies = [ "proc-macro2", "quote", @@ -4553,7 +4642,7 @@ dependencies = [ "proc-macro-error", "proc-macro2", "quote", - "syn 2.0.39", + "syn 2.0.48", ] [[package]] @@ -4642,7 +4731,7 @@ checksum = "266b2e40bc00e5a6c09c3584011e08b06f123c00362c92b975ba9843aaaa14b8" dependencies = [ "proc-macro2", "quote", - "syn 2.0.39", + "syn 2.0.48", ] [[package]] @@ -4712,9 +4801,9 @@ checksum = "1f3ccbac311fea05f86f61904b462b55fb3df8837a366dfc601a0161d0532f20" [[package]] name = "tokio" -version = "1.34.0" +version = "1.36.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d0c014766411e834f7af5b8f4cf46257aab4036ca95e9d2c144a10f59ad6f5b9" +checksum = "61285f6515fa018fb2d1e46eb21223fff441ee8db5d0f1435e8ab4f5cdb80931" dependencies = [ "backtrace", "bytes", @@ -4737,7 +4826,7 @@ checksum = "5b8a1e28f2deaa14e508979454cb3a223b10b938b45af148bc0986de36f1923b" dependencies = [ "proc-macro2", "quote", - "syn 2.0.39", + "syn 2.0.48", ] [[package]] @@ -4823,7 +4912,7 @@ dependencies = [ "bitflags 2.4.1", "bytes", "futures-util", - "http", + "http 1.0.0", "http-body", "http-body-util", "http-range-header", @@ -4871,7 +4960,7 @@ checksum = "34704c8d6ebcbc939824180af020566b01a7c01f80641264eba0999f6c2b6be7" dependencies = [ "proc-macro2", "quote", - "syn 2.0.39", + "syn 2.0.48", ] [[package]] @@ -5029,7 +5118,7 @@ checksum = "fea2a4c80deb4fb3ca51f66b5e2dd91e3642bbce52234bcf22e41668281208e4" dependencies = [ "proc-macro-hack", "quote", - "syn 2.0.39", + "syn 2.0.48", "unic-langid-impl", ] @@ -5304,7 +5393,7 @@ dependencies = [ "once_cell", "proc-macro2", "quote", - "syn 2.0.39", + "syn 2.0.48", "wasm-bindgen-shared", ] @@ -5338,7 +5427,7 @@ checksum = "f0eb82fcb7930ae6219a7ecfd55b217f5f0893484b7a13022ebb2b2bf20b5283" dependencies = [ "proc-macro2", "quote", - "syn 2.0.39", + "syn 2.0.48", "wasm-bindgen-backend", "wasm-bindgen-shared", ] @@ -5665,7 +5754,7 @@ checksum = "dbbad221e3f78500350ecbd7dfa4e63ef945c05f4c61cb7f4d3f84cd0bba649b" dependencies = [ "proc-macro2", "quote", - "syn 2.0.39", + "syn 2.0.48", ] [[package]] diff --git a/examples/demo/Cargo.toml b/examples/demo/Cargo.toml index de77798e0..d5665e463 100644 --- a/examples/demo/Cargo.toml +++ b/examples/demo/Cargo.toml @@ -28,7 +28,7 @@ sea-orm = { version = "0.12.4", features = [ "macros", ] } -axum = "0.7.1" +axum = { version = "0.7.1", features = ["multipart"] } axum_session = { version = "0.10.1", default-features = false } include_dir = "0.7" @@ -51,3 +51,4 @@ loco-rs = { version = "*", path = "../../", features = ["testing"] } trycmd = "0.14.19" insta = { version = "1.34.0", features = ["redactions", "yaml", "filters"] } loco-macros = { path = "../../macros" } +axum-test = { version = "14.3.0" } diff --git a/examples/demo/src/app.rs b/examples/demo/src/app.rs index 2cba1684b..ba2caa259 100644 --- a/examples/demo/src/app.rs +++ b/examples/demo/src/app.rs @@ -4,9 +4,11 @@ use async_trait::async_trait; use loco_rs::{ app::{AppContext, Hooks, Initializer}, boot::{create_app, BootResult, StartMode}, + config::Config, controller::AppRoutes, db::{self, truncate_table}, environment::Environment, + storage::{self, Storage}, task::Tasks, worker::{AppWorker, Processor}, Result, @@ -53,12 +55,27 @@ impl Hooks for App { .add_route(controllers::mysession::routes()) .add_route(controllers::dashboard::routes()) .add_route(controllers::user::routes()) + .add_route(controllers::upload::routes()) } async fn boot(mode: StartMode, environment: &Environment) -> Result { create_app::(mode, environment).await } + async fn storage( + _config: &Config, + environment: &Environment, + ) -> Result> { + let store = if environment == &Environment::Test { + storage::drivers::mem::new() + } else { + storage::drivers::local::new_with_prefix("storage-uploads").map_err(Box::from)? + }; + + let storage = Storage::single(store); + return Ok(Some(storage)); + } + fn connect_workers<'a>(p: &'a mut Processor, ctx: &'a AppContext) { p.register(DownloadWorker::build(ctx)); } diff --git a/examples/demo/src/controllers/mod.rs b/examples/demo/src/controllers/mod.rs index 42e7901bd..37c25f2ce 100644 --- a/examples/demo/src/controllers/mod.rs +++ b/examples/demo/src/controllers/mod.rs @@ -2,4 +2,5 @@ pub mod auth; pub mod dashboard; pub mod mysession; pub mod notes; +pub mod upload; pub mod user; diff --git a/examples/demo/src/controllers/upload.rs b/examples/demo/src/controllers/upload.rs new file mode 100644 index 000000000..231d6738f --- /dev/null +++ b/examples/demo/src/controllers/upload.rs @@ -0,0 +1,52 @@ +use std::path::PathBuf; + +use axum::extract::Multipart; +use loco_rs::prelude::*; + +use crate::views; + +/// File upload example +/// +/// ## Request Example +/// +/// curl -H "Content-Type: multipart/form-data" -F "file=@./test-2.json" +/// 127.0.0.1:3000/upload/file +async fn upload_file( + State(ctx): State, + mut multipart: Multipart, +) -> Result> { + let mut file = None; + while let Some(field) = multipart.next_field().await.map_err(|err| { + tracing::error!(error = ?err,"could not readd multipart"); + Error::BadRequest("could not readd multipart".into()) + })? { + let file_name = match field.file_name() { + Some(file_name) => file_name.to_string(), + _ => return Err(Error::BadRequest("file name not found".into())), + }; + + let content = field.bytes().await.map_err(|err| { + tracing::error!(error = ?err,"could not readd bytes"); + Error::BadRequest("could not readd bytes".into()) + })?; + + let path = PathBuf::from("folder").join(file_name); + ctx.storage + .as_ref() + .unwrap() + .upload(path.as_path(), &content) + .await?; + + file = Some(path); + } + + file.map_or_else(not_found, |path| { + format::json(views::upload::Response::new(path.as_path())) + }) +} + +pub fn routes() -> Routes { + Routes::new() + .prefix("upload") + .add("/file", post(upload_file)) +} diff --git a/examples/demo/src/views/mod.rs b/examples/demo/src/views/mod.rs index 1597ab4fe..ffc45f8dd 100644 --- a/examples/demo/src/views/mod.rs +++ b/examples/demo/src/views/mod.rs @@ -1,4 +1,5 @@ pub mod auth; pub mod dashboard; pub mod notes; +pub mod upload; pub mod user; diff --git a/examples/demo/src/views/upload.rs b/examples/demo/src/views/upload.rs new file mode 100644 index 000000000..e6b7c0026 --- /dev/null +++ b/examples/demo/src/views/upload.rs @@ -0,0 +1,17 @@ +use std::path::{Path, PathBuf}; + +use serde::{Deserialize, Serialize}; + +#[derive(Debug, Deserialize, Serialize)] +pub struct Response { + pub path: PathBuf, +} + +impl Response { + #[must_use] + pub fn new(path: &Path) -> Self { + Self { + path: path.to_path_buf(), + } + } +} diff --git a/examples/demo/storage-uploads/.gitignore b/examples/demo/storage-uploads/.gitignore new file mode 100644 index 000000000..c96a04f00 --- /dev/null +++ b/examples/demo/storage-uploads/.gitignore @@ -0,0 +1,2 @@ +* +!.gitignore \ No newline at end of file diff --git a/examples/demo/tests/cli_tests.rs b/examples/demo/tests/cli_tests.rs index 74835d443..b5269d0ed 100644 --- a/examples/demo/tests/cli_tests.rs +++ b/examples/demo/tests/cli_tests.rs @@ -1,5 +1,3 @@ -use std::env; - #[test] fn cli_tests() { let t = trycmd::TestCases::new(); diff --git a/examples/demo/tests/cmd/cli.trycmd b/examples/demo/tests/cmd/cli.trycmd index 7b1d25e40..bcd17e18a 100644 --- a/examples/demo/tests/cmd/cli.trycmd +++ b/examples/demo/tests/cmd/cli.trycmd @@ -95,6 +95,7 @@ $ blo-cli routes --environment test [GET] /notes/:id [DELETE] /notes/:id [POST] /notes/:id +[POST] /upload/file [GET] /user/current [GET] /user/current_api_key diff --git a/examples/demo/tests/requests/mod.rs b/examples/demo/tests/requests/mod.rs index 81ed68f96..c6835e7bb 100644 --- a/examples/demo/tests/requests/mod.rs +++ b/examples/demo/tests/requests/mod.rs @@ -1,4 +1,5 @@ mod auth; mod notes; mod prepare_data; +mod upload; mod user; diff --git a/examples/demo/tests/requests/upload.rs b/examples/demo/tests/requests/upload.rs new file mode 100644 index 000000000..cb5cef468 --- /dev/null +++ b/examples/demo/tests/requests/upload.rs @@ -0,0 +1,26 @@ +use axum_test::multipart::{MultipartForm, Part}; +use blo::{app::App, views}; +use loco_rs::testing; +use serial_test::serial; + +#[tokio::test] +#[serial] +async fn can_register() { + testing::request::(|request, ctx| async move { + let file_content = "loco file upload"; + let file_part = Part::bytes(file_content.as_bytes()).file_name("loco.txt"); + + let multipart_form = MultipartForm::new().add_part("file", file_part); + + let response = request.post("/upload/file").multipart(multipart_form).await; + + response.assert_status_ok(); + + let res: views::upload::Response = serde_json::from_str(&response.text()).unwrap(); + + let stored_file: String = ctx.storage.unwrap().download(&res.path).await.unwrap(); + + assert_eq!(stored_file, file_content); + }) + .await; +} diff --git a/src/app.rs b/src/app.rs index 29cdfe226..1a71e468b 100644 --- a/src/app.rs +++ b/src/app.rs @@ -18,6 +18,7 @@ use crate::{ controller::AppRoutes, environment::Environment, mailer::EmailSender, + storage::Storage, task::Tasks, worker::{Pool, Processor, RedisConnectionManager}, Result, @@ -43,6 +44,8 @@ pub struct AppContext { pub config: Config, /// An optional email sender component that can be used to send email. pub mailer: Option, + // Ab optional storage instance for the application + pub storage: Option, } /// A trait that defines hooks for customizing and extending the behavior of a @@ -150,6 +153,14 @@ pub trait Hooks { /// Defines the application's routing configuration. fn routes(_ctx: &AppContext) -> AppRoutes; + /// Defines the storage configuration for the application + async fn storage( + _config: &config::Config, + _environment: &Environment, + ) -> Result> { + Ok(None) + } + #[cfg(feature = "channels")] /// Register channels endpoints to the application routers fn register_channels(_ctx: &AppContext) -> AppChannels; diff --git a/src/boot.rs b/src/boot.rs index 772dfa639..b65fa3db8 100644 --- a/src/boot.rs +++ b/src/boot.rs @@ -202,6 +202,7 @@ pub async fn create_context(environment: &Environment) -> Result), diff --git a/src/lib.rs b/src/lib.rs index 6115e9978..7f7a1d28d 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -62,6 +62,7 @@ pub mod task; pub mod testing; #[cfg(feature = "testing")] pub use axum_test::TestServer; +pub mod storage; pub mod validation; pub mod worker; #[cfg(feature = "channels")] diff --git a/src/storage/contents.rs b/src/storage/contents.rs new file mode 100644 index 000000000..a494a4a37 --- /dev/null +++ b/src/storage/contents.rs @@ -0,0 +1,37 @@ +use bytes::Bytes; + +pub struct Contents { + data: Bytes, +} + +impl From for Contents { + /// Converts a `Vec` into a `Contents` instance. + /// + /// # Returns + /// + /// Returns a `Contents` instance with the provided byte data. + fn from(data: Bytes) -> Self { + Self { data } + } +} + +impl From for Vec { + /// Convert `Contents` instance int a Vec Self { + contents.data.to_vec() + } +} + +impl TryFrom for String { + type Error = std::string::FromUtf8Error; + + /// Tries to convert a `Contents` instance into a `String`. + /// + /// # Returns + /// + /// Returns a `Result` containing a `String` with the UTF-8 representation + /// of the byte data, or an error if the conversion fails. + fn try_from(contents: Contents) -> Result { + Self::from_utf8(contents.data.to_vec()) + } +} diff --git a/src/storage/drivers/aws.rs b/src/storage/drivers/aws.rs new file mode 100644 index 000000000..ea42c0d03 --- /dev/null +++ b/src/storage/drivers/aws.rs @@ -0,0 +1,72 @@ +#[cfg(test)] +use core::time::Duration; +use std::sync::Arc; + +use object_store::{ + aws::{AmazonS3Builder, AwsCredential}, + ObjectStore, StaticCredentialProvider, +}; +#[cfg(test)] +use object_store::{BackoffConfig, RetryConfig}; + +use super::Store; +use crate::Result; + +/// A set of AWS security credentials +pub struct Credential { + /// AWS_ACCESS_KEY_ID + pub key_id: String, + /// AWS_SECRET_ACCESS_KEY + pub secret_key: String, + /// AWS_SESSION_TOKEN + pub token: Option, +} +/// Create new AWS s3 storage with bucket and region. +/// +/// # Errors +/// +/// When could not initialize the client instance +pub fn new(bucket_name: &str, region: &str) -> Result { + let s3 = AmazonS3Builder::new() + .with_bucket_name(bucket_name) + .with_region(region) + .build() + .map_err(Box::from)?; + + Ok(Store::new((Box::new(s3) as Box).into())) +} + +/// Create new AWS s3 storage with bucket, region and credentials. +/// +/// # Errors +/// +/// When could not initialize the client instance +pub fn with_credentials(bucket_name: &str, region: &str, credentials: Credential) -> Result { + let s3 = AmazonS3Builder::new() + .with_bucket_name(bucket_name) + .with_region(region) + .with_credentials(Arc::new(StaticCredentialProvider::new(AwsCredential { + key_id: credentials.key_id.to_string(), + secret_key: credentials.secret_key.to_string(), + token: credentials.token, + }))) + .build() + .map_err(Box::from)?; + + Ok(Store::new((Box::new(s3) as Box).into())) +} + +#[cfg(test)] +pub fn with_failure() -> Store { + let s3 = AmazonS3Builder::new() + .with_bucket_name("loco-test") + .with_retry(RetryConfig { + backoff: BackoffConfig::default(), + max_retries: 0, + retry_timeout: Duration::from_secs(0), + }) + .build() + .unwrap(); + + Store::new((Box::new(s3) as Box).into()) +} diff --git a/src/storage/drivers/azure.rs b/src/storage/drivers/azure.rs new file mode 100644 index 000000000..731757949 --- /dev/null +++ b/src/storage/drivers/azure.rs @@ -0,0 +1,20 @@ +use object_store::{azure::MicrosoftAzureBuilder, ObjectStore}; + +use super::Store; +use crate::Result; + +/// Create new Azure storage. +/// +/// # Errors +/// +/// When could not initialize the client instance +pub fn new(container_name: &str, account_name: &str, access_key: &str) -> Result { + let azure = MicrosoftAzureBuilder::new() + .with_container_name(container_name) + .with_account(account_name) + .with_access_key(access_key) + .build() + .map_err(Box::from)?; + + Ok(Store::new((Box::new(azure) as Box).into())) +} diff --git a/src/storage/drivers/gcp.rs b/src/storage/drivers/gcp.rs new file mode 100644 index 000000000..7f48066a7 --- /dev/null +++ b/src/storage/drivers/gcp.rs @@ -0,0 +1,20 @@ +use object_store::{gcp::GoogleCloudStorageBuilder, ObjectStore}; + +use super::Store; +use crate::Result; + +/// Create new GCP storage. +/// +/// # Errors +/// +/// When could not initialize the client instance +pub fn new(bucket_name: &str, service_account_key: &str, service_account: &str) -> Result { + let gcs = GoogleCloudStorageBuilder::new() + .with_bucket_name(bucket_name) + .with_service_account_key(service_account_key) + .with_service_account_path(service_account) + .build() + .map_err(Box::from)?; + + Ok(Store::new((Box::new(gcs) as Box).into())) +} diff --git a/src/storage/drivers/local.rs b/src/storage/drivers/local.rs new file mode 100644 index 000000000..3400d9776 --- /dev/null +++ b/src/storage/drivers/local.rs @@ -0,0 +1,23 @@ +use object_store::{local::LocalFileSystem, ObjectStore}; + +use super::Store; +use crate::Result; + +/// Create new filesystem storage with no prefix +#[must_use] +pub fn new() -> Store { + Store::new((Box::new(LocalFileSystem::new()) as Box).into()) +} + +/// Create new filesystem storage with `prefix` applied to all paths +/// +/// # Errors +/// +/// Returns an error if the path does not exist +pub fn new_with_prefix(prefix: impl AsRef) -> Result { + Ok(Store::new( + (Box::new(LocalFileSystem::new_with_prefix(prefix).map_err(Box::from)?) + as Box) + .into(), + )) +} diff --git a/src/storage/drivers/mem.rs b/src/storage/drivers/mem.rs new file mode 100644 index 000000000..a2e6d9951 --- /dev/null +++ b/src/storage/drivers/mem.rs @@ -0,0 +1,9 @@ +use object_store::{memory::InMemory, ObjectStore}; + +use super::Store; + +/// Create new in-memory storage. +#[must_use] +pub fn new() -> Store { + Store::new((Box::new(InMemory::new()) as Box).into()) +} diff --git a/src/storage/drivers/mod.rs b/src/storage/drivers/mod.rs new file mode 100644 index 000000000..684323155 --- /dev/null +++ b/src/storage/drivers/mod.rs @@ -0,0 +1,100 @@ +use std::{path::Path, sync::Arc}; + +use bytes::Bytes; +#[cfg(feature = "storage_aws_s3")] +pub mod aws; +#[cfg(feature = "storage_azure")] +pub mod azure; +#[cfg(feature = "storage_gcp")] +pub mod gcp; +pub mod local; +pub mod mem; +pub use object_store; +use object_store::ObjectStore; + +use super::error::StoreResult; + +#[derive(Clone)] +pub struct Store { + driver: Arc, +} + +impl Store { + /// Constructor for creating a new `Store` instance. + pub fn new(driver: Arc) -> Self { + Self { driver } + } +} + +impl Store { + /// Uploads the content represented by `Bytes` to the specified path in the + /// object store. + /// + /// # Errors + /// + /// Returns a `StoreResult` with the result of the upload operation. + pub async fn upload( + &self, + path: &Path, + content: &Bytes, + ) -> StoreResult { + let path = object_store::path::Path::from(path.display().to_string()); + Ok(self.driver.put(&path, content.clone()).await?) + } + + /// Retrieves the content from the specified path in the object store. + /// + /// # Errors + /// + /// Returns a `StoreResult` with the result of the retrieval operation. + pub async fn get(&self, path: &Path) -> StoreResult { + let path = object_store::path::Path::from(path.display().to_string()); + Ok(self.driver.get(&path).await?) + } + + /// Deletes the content at the specified path in the object store. + /// + /// # Errors + /// + /// Returns a `StoreResult` indicating the success of the deletion + /// operation. + pub async fn delete(&self, path: &Path) -> StoreResult<()> { + let path = object_store::path::Path::from(path.display().to_string()); + Ok(self.driver.delete(&path).await?) + } + + /// Renames or moves the content from one path to another in the object + /// store. + /// + /// # Errors + /// + /// Returns a `StoreResult` indicating the success of the rename/move + /// operation. + pub async fn rename(&self, from: &Path, to: &Path) -> StoreResult<()> { + let from = object_store::path::Path::from(from.display().to_string()); + let to = object_store::path::Path::from(to.display().to_string()); + Ok(self.driver.rename(&from, &to).await?) + } + + /// Copies the content from one path to another in the object store. + /// + /// # Errors + /// + /// Returns a `StoreResult` indicating the success of the copy operation. + pub async fn copy(&self, from: &Path, to: &Path) -> StoreResult<()> { + let from = object_store::path::Path::from(from.display().to_string()); + let to = object_store::path::Path::from(to.display().to_string()); + Ok(self.driver.copy(&from, &to).await?) + } + + /// Checks if the content exists at the specified path in the object store. + /// + /// # Errors + /// + /// Returns a `StoreResult` with a boolean indicating the existence of the + /// content. + pub async fn exists(&self, path: &Path) -> StoreResult { + let path = object_store::path::Path::from(path.display().to_string()); + Ok(self.driver.get(&path).await.is_ok()) + } +} diff --git a/src/storage/error.rs b/src/storage/error.rs new file mode 100644 index 000000000..72e806e35 --- /dev/null +++ b/src/storage/error.rs @@ -0,0 +1,29 @@ +use std::{collections::BTreeMap, path::PathBuf}; + +use object_store::Error; + +#[derive(thiserror::Error, Debug)] +#[allow(clippy::module_name_repetitions)] +pub enum StoreError { + #[error(transparent)] + Storage(#[from] Error), + + #[error("Unable to read data from file {}", path.display().to_string())] + UnableToReadBytes { path: PathBuf }, +} + +#[derive(thiserror::Error, Debug)] +#[allow(clippy::module_name_repetitions)] +pub enum StorageError { + #[error("store not found by the given key: {0}")] + StoreNotFound(String), + + #[error(transparent)] + Storage(#[from] StoreError), + + #[error("secondaries errors")] + Multi(BTreeMap), +} + +pub type StoreResult = std::result::Result; +pub type StorageResult = std::result::Result; diff --git a/src/storage/mod.rs b/src/storage/mod.rs new file mode 100644 index 000000000..a0ba66b43 --- /dev/null +++ b/src/storage/mod.rs @@ -0,0 +1,231 @@ +//! # Storage Module +//! +//! This module defines a generic storage abstraction represented by the +//! [`Storage`] struct. It provides methods for performing common storage +//! operations such as upload, download, delete, rename, and copy. +//! +//! ## Storage Strategy +//! +//! The [`Storage`] struct is designed to work with different storage +//! strategies. A storage strategy defines the behavior of the storage +//! operations. Strategies implement the [`strategies::StorageStrategyTrait`]. +//! The selected strategy can be dynamically changed at runtime. +mod contents; +pub mod drivers; +pub mod error; +pub mod strategies; +use std::{collections::BTreeMap, path::Path, sync::Arc}; + +use bytes::Bytes; + +use self::error::StorageResult; + +#[derive(Clone)] +pub struct Storage { + pub stores: BTreeMap, + pub strategy: Arc, +} + +impl Storage { + /// Creates a new storage instance with a single store and the default + /// strategy. + #[must_use] + pub fn single(store: drivers::Store) -> Self { + let default_key = "store"; + Self { + strategy: Arc::new(strategies::single::SingleStrategy::new(default_key)), + stores: BTreeMap::from([(default_key.to_string(), store)]), + } + } + + /// Creates a new storage instance with the provided stores and strategy. + #[must_use] + pub fn new( + stores: BTreeMap, + strategy: Arc, + ) -> Self { + Self { stores, strategy } + } + + /// Uploads content to the storage at the specified path. + /// + /// This method uses the selected strategy for the upload operation. + /// + /// # Errors + /// + /// This method returns an error if the upload operation fails or if there + /// is an issue with the strategy configuration. + pub async fn upload(&self, path: &Path, content: &Bytes) -> error::StorageResult<()> { + self.upload_with_strategy(path, content, &self.strategy) + .await + } + + /// Uploads content to the storage at the specified path using a specific + /// strategy. + /// + /// This method allows specifying a custom strategy for the upload + /// operation. + /// + /// # Errors + /// + /// This method returns an error if the upload operation fails or if there + /// is an issue with the strategy configuration. + pub async fn upload_with_strategy( + &self, + path: &Path, + content: &Bytes, + strategy: &Arc, + ) -> error::StorageResult<()> { + strategy.upload(self, path, content).await + } + + /// Downloads content from the storage at the specified path. + /// + /// This method uses the selected strategy for the download operation. + /// + /// # Errors + /// + /// This method returns an error if the download operation fails or if there + /// is an issue with the strategy configuration. + pub async fn download>( + &self, + path: &Path, + ) -> error::StorageResult { + self.download_with_policy(path, &self.strategy).await + } + + /// Downloads content from the storage at the specified path using a + /// specific strategy. + /// + /// This method allows specifying a custom strategy for the download + /// operation. + /// + /// # Errors + /// + /// This method returns an error if the download operation fails or if there + /// is an issue with the strategy configuration. + pub async fn download_with_policy>( + &self, + path: &Path, + strategy: &Arc, + ) -> error::StorageResult { + let res = strategy.download(self, path).await?; + contents::Contents::from(res).try_into().map_or_else( + |_| { + Err(error::StorageError::Storage( + error::StoreError::UnableToReadBytes { + path: path.to_path_buf(), + }, + )) + }, + |content| Ok(content), + ) + } + + /// Deletes content from the storage at the specified path. + /// + /// This method uses the selected strategy for the delete operation. + /// + /// # Errors + /// + /// This method returns an error if the delete operation fails or if there + /// is an issue with the strategy configuration. + pub async fn delete(&self, path: &Path) -> error::StorageResult<()> { + self.delete_with_policy(path, &self.strategy).await + } + + /// Deletes content from the storage at the specified path using a specific + /// strategy. + /// + /// This method allows specifying a custom strategy for the delete + /// operation. + /// + /// # Errors + /// + /// This method returns an error if the delete operation fails or if there + /// is an issue with the strategy configuration. + pub async fn delete_with_policy( + &self, + path: &Path, + strategy: &Arc, + ) -> error::StorageResult<()> { + strategy.delete(self, path).await + } + + /// Renames content from one path to another in the storage. + /// + /// This method uses the selected strategy for the rename operation. + /// + /// # Errors + /// + /// This method returns an error if the rename operation fails or if there + /// is an issue with the strategy configuration. + pub async fn rename(&self, from: &Path, to: &Path) -> error::StorageResult<()> { + self.rename_with_policy(from, to, &self.strategy).await + } + + /// Renames content from one path to another in the storage using a specific + /// strategy. + /// + /// This method allows specifying a custom strategy for the rename + /// operation. + /// + /// # Errors + /// + /// This method returns an error if the rename operation fails or if there + /// is an issue with the strategy configuration. + pub async fn rename_with_policy( + &self, + from: &Path, + to: &Path, + strategy: &Arc, + ) -> error::StorageResult<()> { + strategy.rename(self, from, to).await + } + + /// Copies content from one path to another in the storage. + /// + /// This method uses the selected strategy for the copy operation. + /// + /// # Errors + /// + /// This method returns an error if the copy operation fails or if there is + /// an issue with the strategy configuration. + pub async fn copy(&self, from: &Path, to: &Path) -> error::StorageResult<()> { + self.copy_with_policy(from, to, &self.strategy).await + } + + /// Copies content from one path to another in the storage using a specific + /// strategy. + /// + /// This method allows specifying a custom strategy for the copy operation. + /// + /// # Errors + /// + /// This method returns an error if the copy operation fails or if there is + /// an issue with the strategy configuration. + pub async fn copy_with_policy( + &self, + from: &Path, + to: &Path, + strategy: &Arc, + ) -> error::StorageResult<()> { + strategy.copy(self, from, to).await + } + + /// Returns a reference to the store with the specified name if exists. + #[must_use] + pub fn as_store(&self, name: &str) -> Option<&drivers::Store> { + self.stores.get(name) + } + + /// Returns a reference to the store with the specified name. + /// + /// # Errors + /// + /// Return an error if the given store name not exists + pub fn as_store_err(&self, name: &str) -> StorageResult<&drivers::Store> { + self.as_store(name) + .ok_or(error::StorageError::StoreNotFound(name.to_string())) + } +} diff --git a/src/storage/strategies/backup.rs b/src/storage/strategies/backup.rs new file mode 100644 index 000000000..7ebb705d2 --- /dev/null +++ b/src/storage/strategies/backup.rs @@ -0,0 +1,1127 @@ +//! # `BackupStrategy` Implementation for Storage Strategies +//! +//! This module provides an implementation of the [`StorageStrategyTrait`] for +//! the [`BackupStrategy`]. The [`BackupStrategy`] is designed to mirror storage +//! operations. +//! +//! ## Strategy Description per operation +//! +//! * `upload`/`delete`/`rename`/`copy`: The primary storage must succeed in the +//! given operation. If there is any failure with the primary storage, this +//! function returns an error. When +//! * [`FailureMode::BackupAll`] is given - all the secondary storages must +//! succeed. If there is one failure in the backup, the operation continues +//! to the rest but returns an error. +//! * [`FailureMode::AllowBackupFailure`] is given - the operation does not +//! return an error when one or more mirror operations fail. +//! * [`FailureMode::AtLeastOneFailure`] is given - at least one operation +//! should pass. +//! * [`FailureMode::CountFailure`] is given - the number of the given backup +//! should pass. +//! +//! * `download`: Initiates the download of the given path only from primary +//! storage. +use std::{collections::BTreeMap, path::Path}; + +use bytes::Bytes; + +use crate::storage::{ + error::{StorageError, StorageResult, StoreError}, + strategies::StorageStrategyTrait, + Storage, +}; + +/// Enum representing the failure mode for the [`BackupStrategy`]. +#[derive(Clone)] +pub enum FailureMode { + /// Fail if any secondary storage backend encounters an error. + BackupAll, + /// Allow errors from secondary storage backup without failing. + AllowBackupFailure, + /// Allow only one backup failure from secondary storage backup without + /// failing. + AtLeastOneFailure, + /// Allow the given backup number to failure from secondary storage backup + /// without failing. + CountFailure(usize), +} + +/// Represents the Backup Strategy for storage operations. +#[derive(Clone)] +pub struct BackupStrategy { + pub primary: String, + pub secondaries: Option>, + pub failure_mode: FailureMode, +} + +#[async_trait::async_trait] +impl StorageStrategyTrait for BackupStrategy { + /// Uploads content to the primary and, if configured, secondary storage + /// backends. + // # Errors + /// + /// Returns a [`StorageResult`] indicating success or an error depend of the + /// [`FailureMode`]. + async fn upload(&self, storage: &Storage, path: &Path, content: &Bytes) -> StorageResult<()> { + storage + .as_store_err(&self.primary)? + .upload(path, content) + .await?; + + let mut collect_errors: BTreeMap = BTreeMap::new(); + if let Some(secondaries) = self.secondaries.as_ref() { + for secondary_store in secondaries { + match storage.as_store_err(secondary_store) { + Ok(store) => { + if let Err(err) = store.upload(path, content).await { + collect_errors.insert(secondary_store.to_string(), err.to_string()); + } + } + Err(err) => { + collect_errors.insert(secondary_store.to_string(), err.to_string()); + } + }; + } + } + + if self.failure_mode.should_fail(&collect_errors) { + return Err(StorageError::Multi(collect_errors)); + } + + Ok(()) + } + + /// Downloads content only from primary storage backend. + async fn download(&self, storage: &Storage, path: &Path) -> StorageResult { + let store = storage.as_store_err(&self.primary)?; + Ok(store + .get(path) + .await? + .bytes() + .await + .map_err(|e| StorageError::Storage(StoreError::Storage(e)))?) + } + + /// Deletes content from the primary and, if configured, secondary storage + /// backends. + /// + /// # Errors + /// + /// Returns a [`StorageResult`] indicating success or an error depend of the + /// [`FailureMode`]. + async fn delete(&self, storage: &Storage, path: &Path) -> StorageResult<()> { + storage.as_store_err(&self.primary)?.delete(path).await?; + + let mut collect_errors: BTreeMap = BTreeMap::new(); + if let Some(secondaries) = self.secondaries.as_ref() { + for secondary_store in secondaries { + match storage.as_store_err(secondary_store) { + Ok(store) => { + if let Err(err) = store.delete(path).await { + collect_errors.insert(secondary_store.to_string(), err.to_string()); + } + } + Err(err) => { + collect_errors.insert(secondary_store.to_string(), err.to_string()); + } + }; + } + } + + if self.failure_mode.should_fail(&collect_errors) { + return Err(StorageError::Multi(collect_errors)); + } + + Ok(()) + } + + /// Renames content on the primary and, if configured, secondary storage + /// backends. + /// + /// # Errors + /// + /// Returns a [`StorageResult`] indicating success or an error depend of the + /// [`FailureMode`]. + async fn rename(&self, storage: &Storage, from: &Path, to: &Path) -> StorageResult<()> { + storage + .as_store_err(&self.primary)? + .rename(from, to) + .await?; + + let mut collect_errors: BTreeMap = BTreeMap::new(); + if let Some(secondaries) = self.secondaries.as_ref() { + for secondary_store in secondaries { + match storage.as_store_err(secondary_store) { + Ok(store) => { + if let Err(err) = store.rename(from, to).await { + collect_errors.insert(secondary_store.to_string(), err.to_string()); + } + } + Err(err) => { + collect_errors.insert(secondary_store.to_string(), err.to_string()); + } + }; + } + } + + if self.failure_mode.should_fail(&collect_errors) { + return Err(StorageError::Multi(collect_errors)); + } + + Ok(()) + } + + /// Copies content from the primary and, if configured, secondary storage + /// backends. + /// + /// # Errors + /// + /// Returns a [`StorageResult`] indicating success or an error depend of the + /// [`FailureMode`]. + async fn copy(&self, storage: &Storage, from: &Path, to: &Path) -> StorageResult<()> { + storage.as_store_err(&self.primary)?.copy(from, to).await?; + + let mut collect_errors: BTreeMap = BTreeMap::new(); + if let Some(secondaries) = self.secondaries.as_ref() { + for secondary_store in secondaries { + match storage.as_store_err(secondary_store) { + Ok(store) => { + if let Err(err) = store.copy(from, to).await { + collect_errors.insert(secondary_store.to_string(), err.to_string()); + } + } + Err(err) => { + collect_errors.insert(secondary_store.to_string(), err.to_string()); + } + }; + } + } + + if self.failure_mode.should_fail(&collect_errors) { + return Err(StorageError::Multi(collect_errors)); + } + + Ok(()) + } +} + +impl BackupStrategy { + /// Creates a new instance of [`BackupStrategy`]. + #[must_use] + pub fn new(primary: &str, secondaries: Option>, failure_mode: FailureMode) -> Self { + Self { + primary: primary.to_string(), + secondaries, + failure_mode, + } + } +} + +impl FailureMode { + #[must_use] + pub fn should_fail(&self, errors: &BTreeMap) -> bool { + match self { + Self::BackupAll => !errors.is_empty(), + Self::AllowBackupFailure => false, + Self::AtLeastOneFailure => errors.len() > 1, + Self::CountFailure(count) => count <= &errors.len(), + } + } +} + +#[cfg(test)] +mod tests { + + use std::{collections::BTreeMap, path::PathBuf}; + + use super::*; + use crate::storage::{drivers, Storage}; + + // Upload + + #[tokio::test] + async fn upload_should_pass_when_backup_all_policy() { + let store_1 = drivers::mem::new(); + let store_2 = drivers::mem::new(); + let store_3 = drivers::mem::new(); + + let strategy: Box = Box::new(BackupStrategy::new( + "store_1", + Some(vec!["store_2".to_string(), "store_3".to_string()]), + FailureMode::BackupAll, + )) as Box; + + let storage = Storage::new( + BTreeMap::from([ + ("store_1".to_string(), store_1.clone()), + ("store_2".to_string(), store_2.clone()), + ("store_3".to_string(), store_3.clone()), + ]), + strategy.into(), + ); + + let path = PathBuf::from("users").join("data").join("1.txt"); + let file_content = Bytes::from("file content"); + + assert!(storage.upload(path.as_path(), &file_content).await.is_ok()); + + assert!(store_1.exists(path.as_path()).await.unwrap()); + assert!(store_2.exists(path.as_path()).await.unwrap()); + assert!(store_3.exists(path.as_path()).await.unwrap()); + } + + #[tokio::test] + async fn upload_should_fail_when_primary_fail() { + let store_1 = drivers::aws::with_failure(); + let store_2 = drivers::mem::new(); + let store_3 = drivers::mem::new(); + + let strategy: Box = Box::new(BackupStrategy::new( + "store_1", + Some(vec!["store_2".to_string(), "store_3".to_string()]), + FailureMode::BackupAll, + )) as Box; + + let storage = Storage::new( + BTreeMap::from([ + ("store_1".to_string(), store_1.clone()), + ("store_2".to_string(), store_2.clone()), + ("store_3".to_string(), store_3.clone()), + ]), + strategy.into(), + ); + + let path = PathBuf::from("users").join("data").join("1.txt"); + let file_content = Bytes::from("file content"); + + assert!(storage.upload(path.as_path(), &file_content).await.is_err()); + + assert!(!store_1.exists(path.as_path()).await.unwrap()); + assert!(!store_2.exists(path.as_path()).await.unwrap()); + assert!(!store_3.exists(path.as_path()).await.unwrap()); + } + + #[tokio::test] + async fn upload_should_pass_when_allow_backup_failure_policy() { + let store_1 = drivers::mem::new(); + let store_2 = drivers::aws::with_failure(); + let store_3 = drivers::mem::new(); + + let strategy: Box = Box::new(BackupStrategy::new( + "store_1", + Some(vec!["store_2".to_string(), "store_3".to_string()]), + FailureMode::AllowBackupFailure, + )) as Box; + + let storage = Storage::new( + BTreeMap::from([ + ("store_1".to_string(), store_1.clone()), + ("store_2".to_string(), store_2.clone()), + ("store_3".to_string(), store_3.clone()), + ]), + strategy.into(), + ); + + let path = PathBuf::from("users").join("data").join("1.txt"); + let file_content = Bytes::from("file content"); + + assert!(storage.upload(path.as_path(), &file_content).await.is_ok()); + + assert!(store_1.exists(path.as_path()).await.unwrap()); + assert!(!store_2.exists(path.as_path()).await.unwrap()); + assert!(store_3.exists(path.as_path()).await.unwrap()); + } + + #[tokio::test] + async fn upload_should_pass_when_at_least_one_failure_policy() { + let store_1 = drivers::mem::new(); + let store_2 = drivers::aws::with_failure(); + let store_3 = drivers::mem::new(); + + let strategy: Box = Box::new(BackupStrategy::new( + "store_1", + Some(vec!["store_2".to_string(), "store_3".to_string()]), + FailureMode::AtLeastOneFailure, + )) as Box; + + let storage = Storage::new( + BTreeMap::from([ + ("store_1".to_string(), store_1.clone()), + ("store_2".to_string(), store_2.clone()), + ("store_3".to_string(), store_3.clone()), + ]), + strategy.into(), + ); + + let path = PathBuf::from("users").join("data").join("1.txt"); + let file_content = Bytes::from("file content"); + + assert!(storage.upload(path.as_path(), &file_content).await.is_ok()); + + assert!(store_1.exists(path.as_path()).await.unwrap()); + assert!(!store_2.exists(path.as_path()).await.unwrap()); + assert!(store_3.exists(path.as_path()).await.unwrap()); + } + + #[tokio::test] + async fn upload_should_fail_when_at_least_one_failure_policy() { + let store_1 = drivers::mem::new(); + let store_2 = drivers::aws::with_failure(); + let store_3 = drivers::aws::with_failure(); + + let strategy: Box = Box::new(BackupStrategy::new( + "store_1", + Some(vec!["store_2".to_string(), "store_3".to_string()]), + FailureMode::CountFailure(2), + )) as Box; + + let storage = Storage::new( + BTreeMap::from([ + ("store_1".to_string(), store_1.clone()), + ("store_2".to_string(), store_2.clone()), + ("store_3".to_string(), store_3.clone()), + ]), + strategy.into(), + ); + + let path = PathBuf::from("users").join("data").join("1.txt"); + let file_content = Bytes::from("file content"); + + assert!(storage.upload(path.as_path(), &file_content).await.is_err()); + + assert!(store_1.exists(path.as_path()).await.unwrap()); + assert!(!store_2.exists(path.as_path()).await.unwrap()); + assert!(!store_3.exists(path.as_path()).await.unwrap()); + } + + #[tokio::test] + async fn upload_should_pass_count_fail_policy_should_pass() { + let store_1 = drivers::mem::new(); + let store_2 = drivers::aws::with_failure(); + let store_3 = drivers::mem::new(); + + let strategy: Box = Box::new(BackupStrategy::new( + "store_1", + Some(vec!["store_2".to_string(), "store_3".to_string()]), + FailureMode::CountFailure(2), + )) as Box; + + let storage = Storage::new( + BTreeMap::from([ + ("store_1".to_string(), store_1.clone()), + ("store_2".to_string(), store_2.clone()), + ("store_3".to_string(), store_3.clone()), + ]), + strategy.into(), + ); + + let path = PathBuf::from("users").join("data").join("1.txt"); + let file_content = Bytes::from("file content"); + + assert!(storage.upload(path.as_path(), &file_content).await.is_ok()); + + assert!(store_1.exists(path.as_path()).await.unwrap()); + assert!(!store_2.exists(path.as_path()).await.unwrap()); + assert!(store_3.exists(path.as_path()).await.unwrap()); + } + + #[tokio::test] + async fn upload_should_fail_when_count_fail_should_fail() { + let store_1 = drivers::mem::new(); + let store_2 = drivers::aws::with_failure(); + let store_3 = drivers::aws::with_failure(); + + let strategy: Box = Box::new(BackupStrategy::new( + "store_1", + Some(vec!["store_2".to_string(), "store_3".to_string()]), + FailureMode::CountFailure(2), + )) as Box; + + let storage = Storage::new( + BTreeMap::from([ + ("store_1".to_string(), store_1.clone()), + ("store_2".to_string(), store_2.clone()), + ("store_3".to_string(), store_3.clone()), + ]), + strategy.into(), + ); + + let path = PathBuf::from("users").join("data").join("1.txt"); + let file_content = Bytes::from("file content"); + + assert!(storage.upload(path.as_path(), &file_content).await.is_err()); + + assert!(store_1.exists(path.as_path()).await.unwrap()); + assert!(!store_2.exists(path.as_path()).await.unwrap()); + assert!(!store_3.exists(path.as_path()).await.unwrap()); + } + + // Download + + #[tokio::test] + async fn can_download() { + let store_1 = drivers::mem::new(); + + let strategy: Box = Box::new(BackupStrategy::new( + "store_1", + Some(vec!["store_2".to_string(), "store_3".to_string()]), + FailureMode::BackupAll, + )) as Box; + + let storage = Storage::new( + BTreeMap::from([("store_1".to_string(), store_1.clone())]), + strategy.into(), + ); + + let path = PathBuf::from("users").join("data").join("1.txt"); + let file_content = Bytes::from("file content"); + + assert!(storage.upload(path.as_path(), &file_content).await.is_err()); + + let download_file: String = storage.download(path.as_path()).await.unwrap(); + assert_eq!(download_file, file_content); + + assert!(store_1.delete(path.as_path()).await.is_ok()); + + let download_file: StorageResult = storage.download(path.as_path()).await; + assert!(download_file.is_err()); + } + + // Delete + + #[tokio::test] + async fn delete_should_pass_when_backup_all_policy() { + let store_1 = drivers::mem::new(); + let store_2 = drivers::mem::new(); + let store_3 = drivers::mem::new(); + + let strategy: Box = Box::new(BackupStrategy::new( + "store_1", + Some(vec!["store_2".to_string(), "store_3".to_string()]), + FailureMode::AllowBackupFailure, + )) as Box; + + let storage = Storage::new( + BTreeMap::from([ + ("store_1".to_string(), store_1.clone()), + ("store_2".to_string(), store_2.clone()), + ("store_3".to_string(), store_3.clone()), + ]), + strategy.into(), + ); + + let path = PathBuf::from("users").join("data").join("1.txt"); + let file_content = Bytes::from("file content"); + + assert!(storage.upload(path.as_path(), &file_content).await.is_ok()); + + assert!(storage.delete(path.as_path()).await.is_ok()); + + assert!(!store_1.exists(path.as_path()).await.unwrap()); + assert!(!store_2.exists(path.as_path()).await.unwrap()); + assert!(!store_3.exists(path.as_path()).await.unwrap()); + } + + // rename + #[tokio::test] + async fn rename_should_pass_when_backup_all_policy() { + let store_1 = drivers::mem::new(); + let store_2 = drivers::mem::new(); + let store_3 = drivers::mem::new(); + + let strategy: Box = Box::new(BackupStrategy::new( + "store_1", + Some(vec!["store_2".to_string(), "store_3".to_string()]), + FailureMode::BackupAll, + )) as Box; + + let storage = Storage::new( + BTreeMap::from([ + ("store_1".to_string(), store_1.clone()), + ("store_2".to_string(), store_2.clone()), + ("store_3".to_string(), store_3.clone()), + ]), + strategy.into(), + ); + + let orig_path = PathBuf::from("users").join("data").join("1.txt"); + let new_path = PathBuf::from("data-2").join("data").join("2.txt"); + let file_content = Bytes::from("file content"); + + assert!(storage + .upload(orig_path.as_path(), &file_content) + .await + .is_ok()); + + assert!(store_1.exists(orig_path.as_path()).await.unwrap()); + assert!(store_2.exists(orig_path.as_path()).await.unwrap()); + assert!(store_3.exists(orig_path.as_path()).await.unwrap()); + + assert!(storage + .rename(orig_path.as_path(), new_path.as_path()) + .await + .is_ok()); + + assert!(!store_1.exists(orig_path.as_path()).await.unwrap()); + assert!(!store_2.exists(orig_path.as_path()).await.unwrap()); + assert!(!store_3.exists(orig_path.as_path()).await.unwrap()); + + assert!(store_1.exists(new_path.as_path()).await.unwrap()); + assert!(store_2.exists(new_path.as_path()).await.unwrap()); + assert!(store_3.exists(new_path.as_path()).await.unwrap()); + } + + #[tokio::test] + async fn rename_should_pass_when_allow_backup_failure_policy() { + let store_1 = drivers::mem::new(); + let store_2 = drivers::mem::new(); + let store_3 = drivers::mem::new(); + + let strategy: Box = Box::new(BackupStrategy::new( + "store_1", + Some(vec!["store_2".to_string(), "store_3".to_string()]), + FailureMode::AllowBackupFailure, + )) as Box; + + let storage = Storage::new( + BTreeMap::from([ + ("store_1".to_string(), store_1.clone()), + ("store_2".to_string(), store_2.clone()), + ("store_3".to_string(), store_3.clone()), + ]), + strategy.into(), + ); + + let orig_path = PathBuf::from("users").join("data").join("1.txt"); + let new_path = PathBuf::from("data-2").join("data").join("2.txt"); + let file_content = Bytes::from("file content"); + + assert!(storage + .upload(orig_path.as_path(), &file_content) + .await + .is_ok()); + + assert!(store_1.exists(orig_path.as_path()).await.unwrap()); + assert!(store_2.exists(orig_path.as_path()).await.unwrap()); + assert!(store_3.exists(orig_path.as_path()).await.unwrap()); + + assert!(store_2.delete(orig_path.as_path()).await.is_ok()); + + assert!(storage + .rename(orig_path.as_path(), new_path.as_path()) + .await + .is_ok()); + + assert!(!store_1.exists(orig_path.as_path()).await.unwrap()); + assert!(!store_2.exists(orig_path.as_path()).await.unwrap()); + assert!(!store_3.exists(orig_path.as_path()).await.unwrap()); + + assert!(store_1.exists(new_path.as_path()).await.unwrap()); + assert!(!store_2.exists(new_path.as_path()).await.unwrap()); + assert!(store_3.exists(new_path.as_path()).await.unwrap()); + } + + #[tokio::test] + async fn rename_should_pass_when_at_least_one_failure_policy() { + let store_1 = drivers::mem::new(); + let store_2 = drivers::mem::new(); + let store_3 = drivers::mem::new(); + + let strategy: Box = Box::new(BackupStrategy::new( + "store_1", + Some(vec!["store_2".to_string(), "store_3".to_string()]), + FailureMode::AtLeastOneFailure, + )) as Box; + + let storage = Storage::new( + BTreeMap::from([ + ("store_1".to_string(), store_1.clone()), + ("store_2".to_string(), store_2.clone()), + ("store_3".to_string(), store_3.clone()), + ]), + strategy.into(), + ); + + let orig_path = PathBuf::from("users").join("data").join("1.txt"); + let new_path = PathBuf::from("data-2").join("data").join("2.txt"); + let file_content = Bytes::from("file content"); + + assert!(storage + .upload(orig_path.as_path(), &file_content) + .await + .is_ok()); + + assert!(store_1.exists(orig_path.as_path()).await.unwrap()); + assert!(store_2.exists(orig_path.as_path()).await.unwrap()); + assert!(store_3.exists(orig_path.as_path()).await.unwrap()); + + assert!(store_2.delete(orig_path.as_path()).await.is_ok()); + + assert!(storage + .rename(orig_path.as_path(), new_path.as_path()) + .await + .is_ok()); + + assert!(!store_1.exists(orig_path.as_path()).await.unwrap()); + assert!(!store_2.exists(orig_path.as_path()).await.unwrap()); + assert!(!store_3.exists(orig_path.as_path()).await.unwrap()); + + assert!(store_1.exists(new_path.as_path()).await.unwrap()); + assert!(!store_2.exists(new_path.as_path()).await.unwrap()); + assert!(store_3.exists(new_path.as_path()).await.unwrap()); + } + + #[tokio::test] + async fn rename_should_fail_when_at_least_one_failure_policy() { + let store_1 = drivers::mem::new(); + let store_2 = drivers::mem::new(); + let store_3 = drivers::mem::new(); + + let strategy: Box = Box::new(BackupStrategy::new( + "store_1", + Some(vec!["store_2".to_string(), "store_3".to_string()]), + FailureMode::AtLeastOneFailure, + )) as Box; + + let storage = Storage::new( + BTreeMap::from([ + ("store_1".to_string(), store_1.clone()), + ("store_2".to_string(), store_2.clone()), + ("store_3".to_string(), store_3.clone()), + ]), + strategy.into(), + ); + + let orig_path = PathBuf::from("users").join("data").join("1.txt"); + let new_path = PathBuf::from("data-2").join("data").join("2.txt"); + let file_content = Bytes::from("file content"); + + assert!(storage + .upload(orig_path.as_path(), &file_content) + .await + .is_ok()); + + assert!(store_1.exists(orig_path.as_path()).await.unwrap()); + assert!(store_2.exists(orig_path.as_path()).await.unwrap()); + assert!(store_3.exists(orig_path.as_path()).await.unwrap()); + + assert!(store_2.delete(orig_path.as_path()).await.is_ok()); + assert!(store_3.delete(orig_path.as_path()).await.is_ok()); + + assert!(storage + .rename(orig_path.as_path(), new_path.as_path()) + .await + .is_err()); + + assert!(!store_1.exists(orig_path.as_path()).await.unwrap()); + assert!(!store_2.exists(orig_path.as_path()).await.unwrap()); + assert!(!store_3.exists(orig_path.as_path()).await.unwrap()); + + assert!(store_1.exists(new_path.as_path()).await.unwrap()); + assert!(!store_2.exists(new_path.as_path()).await.unwrap()); + assert!(!store_3.exists(new_path.as_path()).await.unwrap()); + } + + #[tokio::test] + async fn rename_should_pass_when_count_fail_policy() { + let store_1 = drivers::mem::new(); + let store_2 = drivers::mem::new(); + let store_3 = drivers::mem::new(); + + let strategy: Box = Box::new(BackupStrategy::new( + "store_1", + Some(vec!["store_2".to_string(), "store_3".to_string()]), + FailureMode::CountFailure(2), + )) as Box; + + let storage = Storage::new( + BTreeMap::from([ + ("store_1".to_string(), store_1.clone()), + ("store_2".to_string(), store_2.clone()), + ("store_3".to_string(), store_3.clone()), + ]), + strategy.into(), + ); + + let orig_path = PathBuf::from("users").join("data").join("1.txt"); + let new_path = PathBuf::from("data-2").join("data").join("2.txt"); + let file_content = Bytes::from("file content"); + + assert!(storage + .upload(orig_path.as_path(), &file_content) + .await + .is_ok()); + + assert!(store_1.exists(orig_path.as_path()).await.unwrap()); + assert!(store_2.exists(orig_path.as_path()).await.unwrap()); + assert!(store_3.exists(orig_path.as_path()).await.unwrap()); + + assert!(store_2.delete(orig_path.as_path()).await.is_ok()); + + assert!(storage + .rename(orig_path.as_path(), new_path.as_path()) + .await + .is_ok()); + + assert!(!store_1.exists(orig_path.as_path()).await.unwrap()); + assert!(!store_2.exists(orig_path.as_path()).await.unwrap()); + assert!(!store_3.exists(orig_path.as_path()).await.unwrap()); + + assert!(store_1.exists(new_path.as_path()).await.unwrap()); + assert!(!store_2.exists(new_path.as_path()).await.unwrap()); + assert!(store_3.exists(new_path.as_path()).await.unwrap()); + } + + #[tokio::test] + async fn rename_should_fail_when_count_fail_policy() { + let store_1 = drivers::mem::new(); + let store_2 = drivers::mem::new(); + let store_3 = drivers::mem::new(); + + let strategy: Box = Box::new(BackupStrategy::new( + "store_1", + Some(vec!["store_2".to_string(), "store_3".to_string()]), + FailureMode::CountFailure(2), + )) as Box; + + let storage = Storage::new( + BTreeMap::from([ + ("store_1".to_string(), store_1.clone()), + ("store_2".to_string(), store_2.clone()), + ("store_3".to_string(), store_3.clone()), + ]), + strategy.into(), + ); + + let orig_path = PathBuf::from("users").join("data").join("1.txt"); + let new_path = PathBuf::from("data-2").join("data").join("2.txt"); + let file_content = Bytes::from("file content"); + + assert!(storage + .upload(orig_path.as_path(), &file_content) + .await + .is_ok()); + + assert!(store_1.exists(orig_path.as_path()).await.unwrap()); + assert!(store_2.exists(orig_path.as_path()).await.unwrap()); + assert!(store_3.exists(orig_path.as_path()).await.unwrap()); + + assert!(store_2.delete(orig_path.as_path()).await.is_ok()); + assert!(store_3.delete(orig_path.as_path()).await.is_ok()); + + assert!(storage + .rename(orig_path.as_path(), new_path.as_path()) + .await + .is_err()); + + assert!(!store_1.exists(orig_path.as_path()).await.unwrap()); + assert!(!store_2.exists(orig_path.as_path()).await.unwrap()); + assert!(!store_3.exists(orig_path.as_path()).await.unwrap()); + + assert!(store_1.exists(new_path.as_path()).await.unwrap()); + assert!(!store_2.exists(new_path.as_path()).await.unwrap()); + assert!(!store_3.exists(new_path.as_path()).await.unwrap()); + } + + // Copy + + #[tokio::test] + async fn copy_should_pass_when_backup_all_policy() { + let store_1 = drivers::mem::new(); + let store_2 = drivers::mem::new(); + let store_3 = drivers::mem::new(); + + let strategy: Box = Box::new(BackupStrategy::new( + "store_1", + Some(vec!["store_2".to_string(), "store_3".to_string()]), + FailureMode::BackupAll, + )) as Box; + + let storage = Storage::new( + BTreeMap::from([ + ("store_1".to_string(), store_1.clone()), + ("store_2".to_string(), store_2.clone()), + ("store_3".to_string(), store_3.clone()), + ]), + strategy.into(), + ); + + let orig_path = PathBuf::from("users").join("data").join("1.txt"); + let new_path = PathBuf::from("data-2").join("data").join("2.txt"); + let file_content = Bytes::from("file content"); + + assert!(storage + .upload(orig_path.as_path(), &file_content) + .await + .is_ok()); + + assert!(store_1.exists(orig_path.as_path()).await.unwrap()); + assert!(store_2.exists(orig_path.as_path()).await.unwrap()); + assert!(store_3.exists(orig_path.as_path()).await.unwrap()); + + assert!(storage + .copy(orig_path.as_path(), new_path.as_path()) + .await + .is_ok()); + + assert!(store_1.exists(orig_path.as_path()).await.unwrap()); + assert!(store_2.exists(orig_path.as_path()).await.unwrap()); + assert!(store_3.exists(orig_path.as_path()).await.unwrap()); + + assert!(store_1.exists(new_path.as_path()).await.unwrap()); + assert!(store_2.exists(new_path.as_path()).await.unwrap()); + assert!(store_3.exists(new_path.as_path()).await.unwrap()); + } + + #[tokio::test] + async fn copy_should_pass_when_allow_backup_failure_policy() { + let store_1 = drivers::mem::new(); + let store_2 = drivers::mem::new(); + let store_3 = drivers::mem::new(); + + let strategy: Box = Box::new(BackupStrategy::new( + "store_1", + Some(vec!["store_2".to_string(), "store_3".to_string()]), + FailureMode::AllowBackupFailure, + )) as Box; + + let storage = Storage::new( + BTreeMap::from([ + ("store_1".to_string(), store_1.clone()), + ("store_2".to_string(), store_2.clone()), + ("store_3".to_string(), store_3.clone()), + ]), + strategy.into(), + ); + + let orig_path = PathBuf::from("users").join("data").join("1.txt"); + let new_path = PathBuf::from("data-2").join("data").join("2.txt"); + let file_content = Bytes::from("file content"); + + assert!(storage + .upload(orig_path.as_path(), &file_content) + .await + .is_ok()); + + assert!(store_1.exists(orig_path.as_path()).await.unwrap()); + assert!(store_2.exists(orig_path.as_path()).await.unwrap()); + assert!(store_3.exists(orig_path.as_path()).await.unwrap()); + + assert!(store_2.delete(orig_path.as_path()).await.is_ok()); + + assert!(storage + .copy(orig_path.as_path(), new_path.as_path()) + .await + .is_ok()); + + assert!(store_1.exists(orig_path.as_path()).await.unwrap()); + assert!(!store_2.exists(orig_path.as_path()).await.unwrap()); + assert!(store_3.exists(orig_path.as_path()).await.unwrap()); + + assert!(store_1.exists(new_path.as_path()).await.unwrap()); + assert!(!store_2.exists(new_path.as_path()).await.unwrap()); + assert!(store_3.exists(new_path.as_path()).await.unwrap()); + } + + #[tokio::test] + async fn copy_should_pass_when_at_least_one_failure_policy() { + let store_1 = drivers::mem::new(); + let store_2 = drivers::mem::new(); + let store_3 = drivers::mem::new(); + + let strategy: Box = Box::new(BackupStrategy::new( + "store_1", + Some(vec!["store_2".to_string(), "store_3".to_string()]), + FailureMode::AtLeastOneFailure, + )) as Box; + + let storage = Storage::new( + BTreeMap::from([ + ("store_1".to_string(), store_1.clone()), + ("store_2".to_string(), store_2.clone()), + ("store_3".to_string(), store_3.clone()), + ]), + strategy.into(), + ); + + let orig_path = PathBuf::from("users").join("data").join("1.txt"); + let new_path = PathBuf::from("data-2").join("data").join("2.txt"); + let file_content = Bytes::from("file content"); + + assert!(storage + .upload(orig_path.as_path(), &file_content) + .await + .is_ok()); + + assert!(store_1.exists(orig_path.as_path()).await.unwrap()); + assert!(store_2.exists(orig_path.as_path()).await.unwrap()); + assert!(store_3.exists(orig_path.as_path()).await.unwrap()); + + assert!(store_2.delete(orig_path.as_path()).await.is_ok()); + + assert!(storage + .copy(orig_path.as_path(), new_path.as_path()) + .await + .is_ok()); + + assert!(store_1.exists(orig_path.as_path()).await.unwrap()); + assert!(!store_2.exists(orig_path.as_path()).await.unwrap()); + assert!(store_3.exists(orig_path.as_path()).await.unwrap()); + + assert!(store_1.exists(new_path.as_path()).await.unwrap()); + assert!(!store_2.exists(new_path.as_path()).await.unwrap()); + assert!(store_3.exists(new_path.as_path()).await.unwrap()); + } + + #[tokio::test] + async fn copy_should_fail_when_at_least_one_failure_policy() { + let store_1 = drivers::mem::new(); + let store_2 = drivers::mem::new(); + let store_3 = drivers::mem::new(); + + let strategy: Box = Box::new(BackupStrategy::new( + "store_1", + Some(vec!["store_2".to_string(), "store_3".to_string()]), + FailureMode::AtLeastOneFailure, + )) as Box; + + let storage = Storage::new( + BTreeMap::from([ + ("store_1".to_string(), store_1.clone()), + ("store_2".to_string(), store_2.clone()), + ("store_3".to_string(), store_3.clone()), + ]), + strategy.into(), + ); + + let orig_path = PathBuf::from("users").join("data").join("1.txt"); + let new_path = PathBuf::from("data-2").join("data").join("2.txt"); + let file_content = Bytes::from("file content"); + + assert!(storage + .upload(orig_path.as_path(), &file_content) + .await + .is_ok()); + + assert!(store_1.exists(orig_path.as_path()).await.unwrap()); + assert!(store_2.exists(orig_path.as_path()).await.unwrap()); + assert!(store_3.exists(orig_path.as_path()).await.unwrap()); + + assert!(store_2.delete(orig_path.as_path()).await.is_ok()); + assert!(store_3.delete(orig_path.as_path()).await.is_ok()); + + assert!(storage + .copy(orig_path.as_path(), new_path.as_path()) + .await + .is_err()); + + assert!(store_1.exists(orig_path.as_path()).await.unwrap()); + assert!(!store_2.exists(orig_path.as_path()).await.unwrap()); + assert!(!store_3.exists(orig_path.as_path()).await.unwrap()); + + assert!(store_1.exists(new_path.as_path()).await.unwrap()); + assert!(!store_2.exists(new_path.as_path()).await.unwrap()); + assert!(!store_3.exists(new_path.as_path()).await.unwrap()); + } + + #[tokio::test] + async fn copy_should_pass_when_count_fail_policy() { + let store_1 = drivers::mem::new(); + let store_2 = drivers::mem::new(); + let store_3 = drivers::mem::new(); + + let strategy: Box = Box::new(BackupStrategy::new( + "store_1", + Some(vec!["store_2".to_string(), "store_3".to_string()]), + FailureMode::CountFailure(2), + )) as Box; + + let storage = Storage::new( + BTreeMap::from([ + ("store_1".to_string(), store_1.clone()), + ("store_2".to_string(), store_2.clone()), + ("store_3".to_string(), store_3.clone()), + ]), + strategy.into(), + ); + + let orig_path = PathBuf::from("users").join("data").join("1.txt"); + let new_path = PathBuf::from("data-2").join("data").join("2.txt"); + let file_content = Bytes::from("file content"); + + assert!(storage + .upload(orig_path.as_path(), &file_content) + .await + .is_ok()); + + assert!(store_1.exists(orig_path.as_path()).await.unwrap()); + assert!(store_2.exists(orig_path.as_path()).await.unwrap()); + assert!(store_3.exists(orig_path.as_path()).await.unwrap()); + + assert!(store_2.delete(orig_path.as_path()).await.is_ok()); + + assert!(storage + .copy(orig_path.as_path(), new_path.as_path()) + .await + .is_ok()); + + assert!(store_1.exists(orig_path.as_path()).await.unwrap()); + assert!(!store_2.exists(orig_path.as_path()).await.unwrap()); + assert!(store_3.exists(orig_path.as_path()).await.unwrap()); + + assert!(store_1.exists(new_path.as_path()).await.unwrap()); + assert!(!store_2.exists(new_path.as_path()).await.unwrap()); + assert!(store_3.exists(new_path.as_path()).await.unwrap()); + } + + #[tokio::test] + async fn copy_should_fail_when_count_fail_policy() { + let store_1 = drivers::mem::new(); + let store_2 = drivers::mem::new(); + let store_3 = drivers::mem::new(); + + let strategy: Box = Box::new(BackupStrategy::new( + "store_1", + Some(vec!["store_2".to_string(), "store_3".to_string()]), + FailureMode::CountFailure(2), + )) as Box; + + let storage = Storage::new( + BTreeMap::from([ + ("store_1".to_string(), store_1.clone()), + ("store_2".to_string(), store_2.clone()), + ("store_3".to_string(), store_3.clone()), + ]), + strategy.into(), + ); + + let orig_path = PathBuf::from("users").join("data").join("1.txt"); + let new_path = PathBuf::from("data-2").join("data").join("2.txt"); + let file_content = Bytes::from("file content"); + + assert!(storage + .upload(orig_path.as_path(), &file_content) + .await + .is_ok()); + + assert!(store_1.exists(orig_path.as_path()).await.unwrap()); + assert!(store_2.exists(orig_path.as_path()).await.unwrap()); + assert!(store_3.exists(orig_path.as_path()).await.unwrap()); + + assert!(store_2.delete(orig_path.as_path()).await.is_ok()); + assert!(store_3.delete(orig_path.as_path()).await.is_ok()); + + assert!(storage + .copy(orig_path.as_path(), new_path.as_path()) + .await + .is_err()); + + assert!(store_1.exists(orig_path.as_path()).await.unwrap()); + assert!(!store_2.exists(orig_path.as_path()).await.unwrap()); + assert!(!store_3.exists(orig_path.as_path()).await.unwrap()); + + assert!(store_1.exists(new_path.as_path()).await.unwrap()); + assert!(!store_2.exists(new_path.as_path()).await.unwrap()); + assert!(!store_3.exists(new_path.as_path()).await.unwrap()); + } +} diff --git a/src/storage/strategies/mirror.rs b/src/storage/strategies/mirror.rs new file mode 100644 index 000000000..4a39ca087 --- /dev/null +++ b/src/storage/strategies/mirror.rs @@ -0,0 +1,708 @@ +//! # `MirrorStrategy` Implementation for Storage Strategies +//! +//! This module provides an implementation of the [`StorageStrategyTrait`] for +//! the [`MirrorStrategy`]. The [`MirrorStrategy`] is designed to mirror storage +//! operations. +//! +//! ## Strategy Description per operation +//! +//! * `upload`/`delete`/`rename`/`copy`: The primary storage must succeed in the +//! given operation. If there is any failure with the primary storage, this +//! function returns an error. When +//! * [`FailureMode::MirrorAll`] is given - all the secondary storages must +//! succeed. If there is one failure in the mirror, the operation continues +//! to the rest but returns an error. +//! * [`FailureMode::AllowMirrorFailure`] is given - the operation does not +//! return an error when one or more mirror operations fail. +//! +//! * `download`: Initiates the download of the given path from the primary +//! storage. If successful, it returns the content. If not found in the +//! primary, it looks for the content in the secondary storages. If the +//! content is not found in any storage backend (both primary and secondary), +//! it returns an error. +use std::{collections::BTreeMap, path::Path}; + +use bytes::Bytes; + +use crate::storage::{ + error::{StorageError, StorageResult, StoreError}, + strategies::StorageStrategyTrait, + Storage, +}; + +/// Enum representing the failure mode for the [`MirrorStrategy`]. +#[derive(Clone)] +pub enum FailureMode { + /// Fail if any secondary storage mirror encounters an error. + MirrorAll, + /// Allow errors from secondary storage mirror without failing. + AllowMirrorFailure, +} + +/// Represents the Mirror Strategy for storage operations. +#[derive(Clone)] +pub struct MirrorStrategy { + /// The primary storage backend. + pub primary: String, + /// Optional secondary storage backends. + pub secondaries: Option>, + /// The failure mode for handling errors from secondary storage backends. + pub failure_mode: FailureMode, +} + +/// Implementation of the [`StorageStrategyTrait`] for the [`MirrorStrategy`]. +/// +/// The [`MirrorStrategy`] is designed to mirror operations (upload, download, +/// delete, rename, copy) across multiple storage backends, with optional +/// secondary storage support and customizable failure modes. +#[async_trait::async_trait] +#[async_trait::async_trait] +impl StorageStrategyTrait for MirrorStrategy { + /// Uploads content to the primary and, if configured, secondary storage + /// mirror. + /// + /// # Errors + /// + /// Returns a [`StorageResult`] indicating success or an error depend of the + /// [`FailureMode`]. + async fn upload(&self, storage: &Storage, path: &Path, content: &Bytes) -> StorageResult<()> { + storage + .as_store_err(&self.primary)? + .upload(path, content) + .await?; + + let mut collect_errors: BTreeMap = BTreeMap::new(); + if let Some(secondaries) = self.secondaries.as_ref() { + for secondary_store in secondaries { + match storage.as_store_err(secondary_store) { + Ok(store) => { + if let Err(err) = store.upload(path, content).await { + collect_errors.insert(secondary_store.to_string(), err.to_string()); + } + } + Err(err) => { + collect_errors.insert(secondary_store.to_string(), err.to_string()); + } + }; + } + } + + if self.failure_mode.should_fail(&collect_errors) { + return Err(StorageError::Multi(collect_errors)); + } + + Ok(()) + } + + /// Downloads content from the primary storage backend. If the primary + /// fails, attempts to download from secondary backends. + async fn download(&self, storage: &Storage, path: &Path) -> StorageResult { + let res = Self::try_download(storage, &self.primary, path).await; + + match res { + Ok(content) => Ok(content), + Err(error) => { + if let Some(secondaries) = self.secondaries.as_ref() { + for secondary_store in secondaries { + if let Ok(content) = + Self::try_download(storage, secondary_store, path).await + { + return Ok(content); + } + } + } + + return Err(error); + } + } + } + + /// Deletes content from the primary and, if configured, secondary storage + /// mirrors. + /// + /// # Errors + /// + /// Returns a [`StorageResult`] indicating success or an error depend of the + /// [`FailureMode`]. + async fn delete(&self, storage: &Storage, path: &Path) -> StorageResult<()> { + storage.as_store_err(&self.primary)?.delete(path).await?; + + let mut collect_errors: BTreeMap = BTreeMap::new(); + if let Some(secondaries) = self.secondaries.as_ref() { + for secondary_store in secondaries { + match storage.as_store_err(secondary_store) { + Ok(store) => { + if let Err(err) = store.delete(path).await { + collect_errors.insert(secondary_store.to_string(), err.to_string()); + } + } + Err(err) => { + collect_errors.insert(secondary_store.to_string(), err.to_string()); + } + }; + } + } + if self.failure_mode.should_fail(&collect_errors) { + return Err(StorageError::Multi(collect_errors)); + } + + Ok(()) + } + + /// Renames content on the primary and, if configured, secondary storage + /// mirrors. + /// + /// # Errors + /// + /// Returns a [`StorageResult`] indicating success or an error depend of the + /// [`FailureMode`]. + async fn rename(&self, storage: &Storage, from: &Path, to: &Path) -> StorageResult<()> { + storage + .as_store_err(&self.primary)? + .rename(from, to) + .await?; + + if let Some(secondaries) = self.secondaries.as_ref() { + let mut collect_errors: BTreeMap = BTreeMap::new(); + for secondary_store in secondaries { + match storage.as_store_err(secondary_store) { + Ok(store) => { + if let Err(err) = store.rename(from, to).await { + collect_errors.insert(secondary_store.to_string(), err.to_string()); + } + } + Err(err) => { + collect_errors.insert(secondary_store.to_string(), err.to_string()); + } + }; + + if self.failure_mode.should_fail(&collect_errors) { + return Err(StorageError::Multi(collect_errors)); + } + } + } + + Ok(()) + } + + /// Copies content from the primary and, if configured, secondary storage + /// mirrors. + /// + /// Returns a [`StorageResult`] indicating success or an error depend of the + /// [`FailureMode`]. + async fn copy(&self, storage: &Storage, from: &Path, to: &Path) -> StorageResult<()> { + storage.as_store_err(&self.primary)?.copy(from, to).await?; + + if let Some(secondaries) = self.secondaries.as_ref() { + let mut collect_errors: BTreeMap = BTreeMap::new(); + for secondary_store in secondaries { + match storage.as_store_err(secondary_store) { + Ok(store) => { + if let Err(err) = store.copy(from, to).await { + collect_errors.insert(secondary_store.to_string(), err.to_string()); + } + } + Err(err) => { + collect_errors.insert(secondary_store.to_string(), err.to_string()); + } + }; + + if self.failure_mode.should_fail(&collect_errors) { + return Err(StorageError::Multi(collect_errors)); + } + } + } + + Ok(()) + } +} + +impl MirrorStrategy { + /// Creates a new instance of [`MirrorStrategy`]. + #[must_use] + pub fn new(primary: &str, secondaries: Option>, failure_mode: FailureMode) -> Self { + Self { + primary: primary.to_string(), + secondaries, + failure_mode, + } + } + + // Private helper function for downloading from a specific store. + async fn try_download( + storage: &Storage, + store_name: &str, + path: &Path, + ) -> StorageResult { + let store = storage.as_store_err(store_name)?; + store + .get(path) + .await? + .bytes() + .await + .map_err(|e| StorageError::Storage(StoreError::Storage(e))) + } +} + +impl FailureMode { + #[must_use] + pub fn should_fail(&self, errors: &BTreeMap) -> bool { + match self { + Self::MirrorAll => !errors.is_empty(), + Self::AllowMirrorFailure => false, + } + } +} + +#[cfg(test)] +mod tests { + + use std::{collections::BTreeMap, path::PathBuf}; + + use super::*; + use crate::storage::{drivers, Storage}; + + #[tokio::test] + async fn upload_should_pass_with_mirror_all_policy() { + let store_1 = drivers::mem::new(); + let store_2 = drivers::mem::new(); + let store_3 = drivers::mem::new(); + + let strategy = Box::new(MirrorStrategy::new( + "store_1", + Some(vec!["store_2".to_string(), "store_3".to_string()]), + FailureMode::MirrorAll, + )) as Box; + + let storage = Storage::new( + BTreeMap::from([ + ("store_1".to_string(), store_1.clone()), + ("store_2".to_string(), store_2.clone()), + ("store_3".to_string(), store_3.clone()), + ]), + strategy.into(), + ); + + let path = PathBuf::from("users").join("data").join("1.txt"); + let file_content = Bytes::from("file content"); + + assert!(storage.upload(path.as_path(), &file_content).await.is_ok()); + + assert!(store_1.exists(path.as_path()).await.unwrap()); + assert!(store_2.exists(path.as_path()).await.unwrap()); + assert!(store_3.exists(path.as_path()).await.unwrap()); + } + + #[tokio::test] + async fn upload_should_fail_with_mirror_all_policy() { + let store_1 = drivers::aws::with_failure(); + let store_2 = drivers::mem::new(); + let store_3 = drivers::mem::new(); + + let strategy = Box::new(MirrorStrategy::new( + "store_1", + Some(vec!["store_2".to_string(), "store_3".to_string()]), + FailureMode::MirrorAll, + )) as Box; + + let storage = Storage::new( + BTreeMap::from([ + ("store_1".to_string(), store_1.clone()), + ("store_2".to_string(), store_2.clone()), + ("store_3".to_string(), store_3.clone()), + ]), + strategy.into(), + ); + + let path = PathBuf::from("users").join("data").join("1.txt"); + let file_content = Bytes::from("file content"); + + assert!(storage.upload(path.as_path(), &file_content).await.is_err()); + + assert!(!store_1.exists(path.as_path()).await.unwrap()); + assert!(!store_2.exists(path.as_path()).await.unwrap()); + assert!(!store_3.exists(path.as_path()).await.unwrap()); + } + + #[tokio::test] + async fn upload_should_fail_when_allow_mirror_failure_policy() { + let store_1 = drivers::mem::new(); + let store_2 = drivers::aws::with_failure(); + let store_3 = drivers::mem::new(); + + let strategy = Box::new(MirrorStrategy::new( + "store_1", + Some(vec!["store_2".to_string(), "store_3".to_string()]), + FailureMode::AllowMirrorFailure, + )) as Box; + + let storage = Storage::new( + BTreeMap::from([ + ("store_1".to_string(), store_1.clone()), + ("store_2".to_string(), store_2.clone()), + ("store_3".to_string(), store_3.clone()), + ]), + strategy.into(), + ); + + let path = PathBuf::from("users").join("data").join("1.txt"); + let file_content = Bytes::from("file content"); + + assert!(storage.upload(path.as_path(), &file_content).await.is_ok()); + + assert!(store_1.exists(path.as_path()).await.unwrap()); + assert!(!store_2.exists(path.as_path()).await.unwrap()); + assert!(store_3.exists(path.as_path()).await.unwrap()); + } + + #[tokio::test] + async fn can_download_when_primary_is_ok() { + let store_1 = drivers::mem::new(); + let store_2 = drivers::mem::new(); + let store_3 = drivers::mem::new(); + + let strategy = Box::new(MirrorStrategy::new( + "store_1", + Some(vec!["store_2".to_string(), "store_3".to_string()]), + FailureMode::MirrorAll, + )) as Box; + + let storage = Storage::new( + BTreeMap::from([ + ("store_1".to_string(), store_1.clone()), + ("store_2".to_string(), store_2.clone()), + ("store_3".to_string(), store_3.clone()), + ]), + strategy.into(), + ); + + let path = PathBuf::from("users").join("data").join("1.txt"); + let file_content = Bytes::from("file content"); + + assert!(storage.upload(path.as_path(), &file_content).await.is_ok()); + + let content: String = storage.download(path.as_path()).await.unwrap(); + assert_eq!(content, "file content".to_string()); + + assert!(store_1.exists(path.as_path()).await.unwrap()); + assert!(store_2.exists(path.as_path()).await.unwrap()); + assert!(store_3.exists(path.as_path()).await.unwrap()); + } + + #[tokio::test] + async fn can_download_when_primary_failed() { + let store_1 = drivers::mem::new(); + let store_2 = drivers::mem::new(); + let store_3 = drivers::mem::new(); + + let strategy = Box::new(MirrorStrategy::new( + "store_1", + Some(vec![ + "store_1".to_string(), + "store_2".to_string(), + "store_3".to_string(), + ]), + FailureMode::MirrorAll, + )) as Box; + + let storage = Storage::new( + BTreeMap::from([ + ("store_1".to_string(), store_1.clone()), + ("store_2".to_string(), store_2.clone()), + ("store_3".to_string(), store_3.clone()), + ]), + strategy.into(), + ); + + let path = PathBuf::from("users").join("data").join("1.txt"); + let file_content = Bytes::from("file content"); + + assert!(storage.upload(path.as_path(), &file_content).await.is_ok()); + + assert!(store_1.delete(path.as_path()).await.is_ok()); + assert!(store_2.delete(path.as_path()).await.is_ok()); + + assert!(!store_1.exists(path.as_path()).await.unwrap()); + assert!(!store_2.exists(path.as_path()).await.unwrap()); + assert!(store_3.exists(path.as_path()).await.unwrap()); + + let content: String = storage.download(path.as_path()).await.unwrap(); + assert_eq!(content, "file content".to_string()); + } + + #[tokio::test] + async fn rename_should_pass_when_primary_is_ok() { + let store_1 = drivers::mem::new(); + let store_2 = drivers::mem::new(); + let store_3 = drivers::mem::new(); + + let strategy = Box::new(MirrorStrategy::new( + "store_1", + Some(vec!["store_2".to_string(), "store_3".to_string()]), + FailureMode::MirrorAll, + )) as Box; + + let storage = Storage::new( + BTreeMap::from([ + ("store_1".to_string(), store_1.clone()), + ("store_2".to_string(), store_2.clone()), + ("store_3".to_string(), store_3.clone()), + ]), + strategy.into(), + ); + + let orig_path = PathBuf::from("users").join("data").join("1.txt"); + let new_path = PathBuf::from("data-2").join("data").join("2.txt"); + let file_content = Bytes::from("file content"); + + assert!(storage + .upload(orig_path.as_path(), &file_content) + .await + .is_ok()); + + assert!(store_1.exists(orig_path.as_path()).await.unwrap()); + assert!(store_2.exists(orig_path.as_path()).await.unwrap()); + assert!(store_3.exists(orig_path.as_path()).await.unwrap()); + + assert!(storage + .rename(orig_path.as_path(), new_path.as_path()) + .await + .is_ok()); + + assert!(!store_1.exists(orig_path.as_path()).await.unwrap()); + assert!(!store_2.exists(orig_path.as_path()).await.unwrap()); + assert!(!store_3.exists(orig_path.as_path()).await.unwrap()); + + assert!(store_1.exists(new_path.as_path()).await.unwrap()); + assert!(store_2.exists(new_path.as_path()).await.unwrap()); + assert!(store_3.exists(new_path.as_path()).await.unwrap()); + } + + #[tokio::test] + async fn rename_should_fail_when_primary_failed() { + let store_1 = drivers::mem::new(); + let store_2 = drivers::mem::new(); + let store_3 = drivers::mem::new(); + + let strategy: Box = Box::new(MirrorStrategy::new( + "store_1", + Some(vec!["store_2".to_string(), "store_3".to_string()]), + FailureMode::MirrorAll, + )) as Box; + + let storage = Storage::new( + BTreeMap::from([ + ("store_1".to_string(), store_1.clone()), + ("store_2".to_string(), store_2.clone()), + ("store_3".to_string(), store_3.clone()), + ]), + strategy.into(), + ); + + let orig_path = PathBuf::from("users").join("data").join("1.txt"); + let new_path = PathBuf::from("data-2").join("data").join("2.txt"); + let file_content = Bytes::from("file content"); + + assert!(storage + .upload(orig_path.as_path(), &file_content) + .await + .is_ok()); + + assert!(store_1.exists(orig_path.as_path()).await.unwrap()); + assert!(store_2.exists(orig_path.as_path()).await.unwrap()); + assert!(store_3.exists(orig_path.as_path()).await.unwrap()); + + assert!(store_2.delete(orig_path.as_path()).await.is_ok()); + + assert!(storage + .rename(orig_path.as_path(), new_path.as_path()) + .await + .is_err()); + } + + #[tokio::test] + async fn rename_should_pass_when_allow_mirror_failure() { + let store_1 = drivers::mem::new(); + let store_2 = drivers::mem::new(); + let store_3 = drivers::mem::new(); + + let strategy: Box = Box::new(MirrorStrategy::new( + "store_1", + Some(vec!["store_2".to_string(), "store_3".to_string()]), + FailureMode::AllowMirrorFailure, + )) as Box; + + let storage = Storage::new( + BTreeMap::from([ + ("store_1".to_string(), store_1.clone()), + ("store_2".to_string(), store_2.clone()), + ("store_3".to_string(), store_3.clone()), + ]), + strategy.into(), + ); + + let orig_path = PathBuf::from("users").join("data").join("1.txt"); + let new_path = PathBuf::from("data-2").join("data").join("2.txt"); + let file_content = Bytes::from("file content"); + + assert!(storage + .upload(orig_path.as_path(), &file_content) + .await + .is_ok()); + + assert!(store_1.exists(orig_path.as_path()).await.unwrap()); + assert!(store_2.exists(orig_path.as_path()).await.unwrap()); + assert!(store_3.exists(orig_path.as_path()).await.unwrap()); + + assert!(store_2.delete(orig_path.as_path()).await.is_ok()); + + assert!(storage + .rename(orig_path.as_path(), new_path.as_path()) + .await + .is_ok()); + + assert!(!store_1.exists(orig_path.as_path()).await.unwrap()); + assert!(!store_3.exists(orig_path.as_path()).await.unwrap()); + + assert!(store_1.exists(new_path.as_path()).await.unwrap()); + assert!(store_3.exists(new_path.as_path()).await.unwrap()); + } + + #[tokio::test] + async fn copy_should_pass_when_primary_is_ok() { + let store_1 = drivers::mem::new(); + let store_2 = drivers::mem::new(); + let store_3 = drivers::mem::new(); + + let strategy = Box::new(MirrorStrategy::new( + "store_1", + Some(vec!["store_2".to_string(), "store_3".to_string()]), + FailureMode::MirrorAll, + )) as Box; + + let storage = Storage::new( + BTreeMap::from([ + ("store_1".to_string(), store_1.clone()), + ("store_2".to_string(), store_2.clone()), + ("store_3".to_string(), store_3.clone()), + ]), + strategy.into(), + ); + + let orig_path = PathBuf::from("users").join("data").join("1.txt"); + let new_path = PathBuf::from("data-2").join("data").join("2.txt"); + let file_content = Bytes::from("file content"); + + assert!(storage + .upload(orig_path.as_path(), &file_content) + .await + .is_ok()); + + assert!(store_1.exists(orig_path.as_path()).await.unwrap()); + assert!(store_2.exists(orig_path.as_path()).await.unwrap()); + assert!(store_3.exists(orig_path.as_path()).await.unwrap()); + + assert!(storage + .copy(orig_path.as_path(), new_path.as_path()) + .await + .is_ok()); + + assert!(store_1.exists(orig_path.as_path()).await.unwrap()); + assert!(store_2.exists(orig_path.as_path()).await.unwrap()); + assert!(store_3.exists(orig_path.as_path()).await.unwrap()); + + assert!(store_1.exists(new_path.as_path()).await.unwrap()); + assert!(store_2.exists(new_path.as_path()).await.unwrap()); + assert!(store_3.exists(new_path.as_path()).await.unwrap()); + } + + #[tokio::test] + async fn copy_should_pass_fail_when_primary() { + let store_1 = drivers::mem::new(); + let store_2 = drivers::mem::new(); + let store_3 = drivers::mem::new(); + + let strategy: Box = Box::new(MirrorStrategy::new( + "store_1", + Some(vec!["store_2".to_string(), "store_3".to_string()]), + FailureMode::MirrorAll, + )) as Box; + + let storage = Storage::new( + BTreeMap::from([ + ("store_1".to_string(), store_1.clone()), + ("store_2".to_string(), store_2.clone()), + ("store_3".to_string(), store_3.clone()), + ]), + strategy.into(), + ); + + let orig_path = PathBuf::from("users").join("data").join("1.txt"); + let new_path = PathBuf::from("data-2").join("data").join("2.txt"); + let file_content = Bytes::from("file content"); + + assert!(storage + .upload(orig_path.as_path(), &file_content) + .await + .is_ok()); + + assert!(store_1.exists(orig_path.as_path()).await.unwrap()); + assert!(store_2.exists(orig_path.as_path()).await.unwrap()); + assert!(store_3.exists(orig_path.as_path()).await.unwrap()); + + assert!(store_2.delete(orig_path.as_path()).await.is_ok()); + + assert!(storage + .copy(orig_path.as_path(), new_path.as_path()) + .await + .is_err()); + } + + #[tokio::test] + async fn should_pass_when_allow_mirror_failure() { + let store_1 = drivers::mem::new(); + let store_2 = drivers::mem::new(); + let store_3 = drivers::mem::new(); + + let strategy: Box = Box::new(MirrorStrategy::new( + "store_1", + Some(vec!["store_2".to_string(), "store_3".to_string()]), + FailureMode::AllowMirrorFailure, + )) as Box; + + let storage = Storage::new( + BTreeMap::from([ + ("store_1".to_string(), store_1.clone()), + ("store_2".to_string(), store_2.clone()), + ("store_3".to_string(), store_3.clone()), + ]), + strategy.into(), + ); + + let orig_path = PathBuf::from("users").join("data").join("1.txt"); + let new_path = PathBuf::from("data-2").join("data").join("2.txt"); + let file_content = Bytes::from("file content"); + + assert!(storage + .upload(orig_path.as_path(), &file_content) + .await + .is_ok()); + + assert!(store_1.exists(orig_path.as_path()).await.unwrap()); + assert!(store_2.exists(orig_path.as_path()).await.unwrap()); + assert!(store_3.exists(orig_path.as_path()).await.unwrap()); + + assert!(store_2.delete(orig_path.as_path()).await.is_ok()); + + assert!(storage + .copy(orig_path.as_path(), new_path.as_path()) + .await + .is_ok()); + + assert!(store_1.exists(orig_path.as_path()).await.unwrap()); + assert!(store_3.exists(orig_path.as_path()).await.unwrap()); + + assert!(store_1.exists(new_path.as_path()).await.unwrap()); + assert!(store_3.exists(new_path.as_path()).await.unwrap()); + } +} diff --git a/src/storage/strategies/mod.rs b/src/storage/strategies/mod.rs new file mode 100644 index 000000000..e122f9d37 --- /dev/null +++ b/src/storage/strategies/mod.rs @@ -0,0 +1,18 @@ +pub mod backup; +pub mod mirror; +pub mod single; + +use std::path::Path; + +use bytes::Bytes; + +use crate::storage::{error::StorageResult, Storage}; + +#[async_trait::async_trait] +pub trait StorageStrategyTrait: Sync + Send { + async fn upload(&self, storage: &Storage, path: &Path, content: &Bytes) -> StorageResult<()>; + async fn download(&self, storage: &Storage, path: &Path) -> StorageResult; + async fn delete(&self, storage: &Storage, path: &Path) -> StorageResult<()>; + async fn rename(&self, storage: &Storage, from: &Path, to: &Path) -> StorageResult<()>; + async fn copy(&self, storage: &Storage, from: &Path, to: &Path) -> StorageResult<()>; +} diff --git a/src/storage/strategies/single.rs b/src/storage/strategies/single.rs new file mode 100644 index 000000000..ae3bd14f9 --- /dev/null +++ b/src/storage/strategies/single.rs @@ -0,0 +1,225 @@ +//! # Single Storage Strategy Implementation +//! +//! This module provides an implementation of the [`StorageStrategyTrait`] for a +//! single storage strategy. +use std::path::Path; + +use bytes::Bytes; + +use crate::storage::{ + error::{StorageError, StorageResult, StoreError}, + strategies::StorageStrategyTrait, + Storage, +}; + +/// Represents a single storage strategy. +#[derive(Clone)] +pub struct SingleStrategy { + pub primary: String, +} + +impl SingleStrategy { + /// Creates a new instance of `SingleStrategy` with the specified primary + /// storage identifier. + #[must_use] + pub fn new(primary: &str) -> Self { + Self { + primary: primary.to_string(), + } + } +} + +/// Implementation of `StorageStrategyTrait` for a single storage strategy. +#[async_trait::async_trait] +impl StorageStrategyTrait for SingleStrategy { + /// Uploads content to the primary storage. + /// + /// # Errors + /// + /// Returns a [`StorageResult`] indicating of the operation status. + async fn upload(&self, storage: &Storage, path: &Path, content: &Bytes) -> StorageResult<()> { + storage + .as_store_err(&self.primary)? + .upload(path, content) + .await?; + Ok(()) + } + + /// Downloads content + /// + /// # Errors + /// + /// Returns a [`StorageResult`] indicating of the operation status. + async fn download(&self, storage: &Storage, path: &Path) -> StorageResult { + let store = storage.as_store_err(&self.primary)?; + Ok(store + .get(path) + .await? + .bytes() + .await + .map_err(|e| StorageError::Storage(StoreError::Storage(e)))?) + } + + /// Deletes the given path + /// + /// # Errors + /// + /// Returns a [`StorageResult`] indicating of the operation status. + async fn delete(&self, storage: &Storage, path: &Path) -> StorageResult<()> { + Ok(storage.as_store_err(&self.primary)?.delete(path).await?) + } + + /// Renames the file name + /// + /// # Errors + /// + /// Returns a [`StorageResult`] indicating of the operation status. + async fn rename(&self, storage: &Storage, from: &Path, to: &Path) -> StorageResult<()> { + Ok(storage + .as_store_err(&self.primary)? + .rename(from, to) + .await?) + } + + /// Copy file from the given path to the new path + /// + /// # Errors + /// + /// Returns a [`StorageResult`] indicating of the operation status. + async fn copy(&self, storage: &Storage, from: &Path, to: &Path) -> StorageResult<()> { + Ok(storage.as_store_err(&self.primary)?.copy(from, to).await?) + } +} + +#[cfg(test)] +mod tests { + + use std::{collections::BTreeMap, path::PathBuf}; + + use super::*; + use crate::storage::{drivers, Storage}; + + #[tokio::test] + async fn can_upload() { + let store = drivers::mem::new(); + + let strategy = Box::new(SingleStrategy::new("default")) as Box; + + let storage = Storage::new( + BTreeMap::from([("default".to_string(), store.clone())]), + strategy.into(), + ); + + let path = PathBuf::from("users").join("data").join("1.txt"); + let file_content = Bytes::from("file content"); + + assert!(storage.upload(path.as_path(), &file_content).await.is_ok()); + + assert!(store.exists(path.as_path()).await.unwrap()); + } + + #[tokio::test] + async fn can_download() { + let store = drivers::mem::new(); + + let strategy = Box::new(SingleStrategy::new("default")) as Box; + + let storage = Storage::new( + BTreeMap::from([("default".to_string(), store.clone())]), + strategy.into(), + ); + + let path = PathBuf::from("users").join("data").join("1.txt"); + let file_content = Bytes::from("file content"); + + assert!(store.upload(path.as_path(), &file_content).await.is_ok()); + + let download_file: String = storage.download(path.as_path()).await.unwrap(); + assert_eq!(download_file, file_content); + } + + #[tokio::test] + async fn can_delete() { + let store = drivers::mem::new(); + + let strategy = Box::new(SingleStrategy::new("default")) as Box; + + let storage = Storage::new( + BTreeMap::from([("default".to_string(), store.clone())]), + strategy.into(), + ); + + let path = PathBuf::from("users").join("data").join("1.txt"); + let file_content = Bytes::from("file content"); + + assert!(store.upload(path.as_path(), &file_content).await.is_ok()); + + assert!(store.exists(path.as_path()).await.unwrap()); + + assert!(storage.delete(path.as_path()).await.is_ok()); + + assert!(!store.exists(path.as_path()).await.unwrap()); + } + + #[tokio::test] + async fn can_rename_file_path() { + let store = drivers::mem::new(); + + let strategy = Box::new(SingleStrategy::new("default")) as Box; + + let storage = Storage::new( + BTreeMap::from([("default".to_string(), store.clone())]), + strategy.into(), + ); + + let orig_path = PathBuf::from("users").join("data").join("1.txt"); + let file_content = Bytes::from("file content"); + + assert!(storage + .upload(orig_path.as_path(), &file_content) + .await + .is_ok()); + + assert!(store.exists(orig_path.as_path()).await.unwrap()); + + let new_path = PathBuf::from("users").join("data-2").join("2.txt"); + assert!(storage + .rename(orig_path.as_path(), new_path.as_path()) + .await + .is_ok()); + + assert!(!store.exists(orig_path.as_path()).await.unwrap()); + assert!(store.exists(new_path.as_path()).await.unwrap()); + } + + #[tokio::test] + async fn can_copy_file_path() { + let store = drivers::mem::new(); + + let strategy = Box::new(SingleStrategy::new("default")) as Box; + + let storage = Storage::new( + BTreeMap::from([("default".to_string(), store.clone())]), + strategy.into(), + ); + + let orig_path = PathBuf::from("users").join("data").join("1.txt"); + let file_content = Bytes::from("file content"); + + assert!(storage + .upload(orig_path.as_path(), &file_content) + .await + .is_ok()); + + assert!(store.exists(orig_path.as_path()).await.unwrap()); + + let new_path = PathBuf::from("users").join("data-2").join("2.txt"); + assert!(storage + .copy(orig_path.as_path(), new_path.as_path()) + .await + .is_ok()); + + assert!(store.exists(orig_path.as_path()).await.unwrap()); + assert!(store.exists(new_path.as_path()).await.unwrap()); + } +}