diff --git a/.gitignore b/.gitignore index 83abdf7b..8358af30 100644 --- a/.gitignore +++ b/.gitignore @@ -1,3 +1,7 @@ /content /target *.swp +content + +.env + diff --git a/Cargo.lock b/Cargo.lock index ec747c43..f03f8818 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -280,6 +280,367 @@ version = "1.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d468802bab17cbc0cc575e9b053f41e72aa36bfa6b7f55e3529ffa43161b97fa" +[[package]] +name = "aws-config" +version = "0.55.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bcdcf0d683fe9c23d32cf5b53c9918ea0a500375a9fb20109802552658e576c9" +dependencies = [ + "aws-credential-types", + "aws-http", + "aws-sdk-sso", + "aws-sdk-sts", + "aws-smithy-async", + "aws-smithy-client", + "aws-smithy-http", + "aws-smithy-http-tower", + "aws-smithy-json", + "aws-smithy-types", + "aws-types", + "bytes", + "fastrand", + "hex", + "http", + "hyper", + "ring", + "time 0.3.21", + "tokio", + "tower", + "tracing", + "zeroize", +] + +[[package]] +name = "aws-credential-types" +version = "0.55.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1fcdb2f7acbc076ff5ad05e7864bdb191ca70a6fd07668dc3a1a8bcd051de5ae" +dependencies = [ + "aws-smithy-async", + "aws-smithy-types", + "fastrand", + "tokio", + "tracing", + "zeroize", +] + +[[package]] +name = "aws-endpoint" +version = "0.55.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8cce1c41a6cfaa726adee9ebb9a56fcd2bbfd8be49fd8a04c5e20fd968330b04" +dependencies = [ + "aws-smithy-http", + "aws-smithy-types", + "aws-types", + "http", + "regex", + "tracing", +] + +[[package]] +name = "aws-http" +version = "0.55.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "aadbc44e7a8f3e71c8b374e03ecd972869eb91dd2bc89ed018954a52ba84bc44" +dependencies = [ + "aws-credential-types", + "aws-smithy-http", + "aws-smithy-types", + "aws-types", + "bytes", + "http", + "http-body", + "lazy_static", + "percent-encoding", + "pin-project-lite", + "tracing", +] + +[[package]] +name = "aws-sdk-s3" +version = "0.28.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fba197193cbb4bcb6aad8d99796b2291f36fa89562ded5d4501363055b0de89f" +dependencies = [ + "aws-credential-types", + "aws-endpoint", + "aws-http", + "aws-sig-auth", + "aws-sigv4", + "aws-smithy-async", + "aws-smithy-checksums", + "aws-smithy-client", + "aws-smithy-eventstream", + "aws-smithy-http", + "aws-smithy-http-tower", + "aws-smithy-json", + "aws-smithy-types", + "aws-smithy-xml", + "aws-types", + "bytes", + "http", + "http-body", + "once_cell", + "percent-encoding", + "regex", + "tokio-stream", + "tower", + "tracing", + "url", +] + +[[package]] +name = "aws-sdk-sso" +version = "0.28.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c8b812340d86d4a766b2ca73f740dfd47a97c2dff0c06c8517a16d88241957e4" +dependencies = [ + "aws-credential-types", + "aws-endpoint", + "aws-http", + "aws-sig-auth", + "aws-smithy-async", + "aws-smithy-client", + "aws-smithy-http", + "aws-smithy-http-tower", + "aws-smithy-json", + "aws-smithy-types", + "aws-types", + "bytes", + "http", + "regex", + "tokio-stream", + "tower", + "tracing", +] + +[[package]] +name = "aws-sdk-sts" +version = "0.28.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "265fac131fbfc188e5c3d96652ea90ecc676a934e3174eaaee523c6cec040b3b" +dependencies = [ + "aws-credential-types", + "aws-endpoint", + "aws-http", + "aws-sig-auth", + "aws-smithy-async", + "aws-smithy-client", + "aws-smithy-http", + "aws-smithy-http-tower", + "aws-smithy-json", + "aws-smithy-query", + "aws-smithy-types", + "aws-smithy-xml", + "aws-types", + "bytes", + "http", + "regex", + "tower", + "tracing", +] + +[[package]] +name = "aws-sig-auth" +version = "0.55.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3b94acb10af0c879ecd5c7bdf51cda6679a0a4f4643ce630905a77673bfa3c61" +dependencies = [ + "aws-credential-types", + "aws-sigv4", + "aws-smithy-eventstream", + "aws-smithy-http", + "aws-types", + "http", + "tracing", +] + +[[package]] +name = "aws-sigv4" +version = "0.55.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9d2ce6f507be68e968a33485ced670111d1cbad161ddbbab1e313c03d37d8f4c" +dependencies = [ + "aws-smithy-eventstream", + "aws-smithy-http", + "bytes", + "form_urlencoded", + "hex", + "hmac", + "http", + "once_cell", + "percent-encoding", + "regex", + "sha2", + "time 0.3.21", + "tracing", +] + +[[package]] +name = "aws-smithy-async" +version = "0.55.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "13bda3996044c202d75b91afeb11a9afae9db9a721c6a7a427410018e286b880" +dependencies = [ + "futures-util", + "pin-project-lite", + "tokio", + "tokio-stream", +] + +[[package]] +name = "aws-smithy-checksums" +version = "0.55.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "07ed8b96d95402f3f6b8b57eb4e0e45ee365f78b1a924faf20ff6e97abf1eae6" +dependencies = [ + "aws-smithy-http", + "aws-smithy-types", + "bytes", + "crc32c", + "crc32fast", + "hex", + "http", + "http-body", + "md-5", + "pin-project-lite", + "sha1", + "sha2", + "tracing", +] + +[[package]] +name = "aws-smithy-client" +version = "0.55.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0a86aa6e21e86c4252ad6a0e3e74da9617295d8d6e374d552be7d3059c41cedd" +dependencies = [ + "aws-smithy-async", + "aws-smithy-http", + "aws-smithy-http-tower", + "aws-smithy-types", + "bytes", + "fastrand", + "http", + "http-body", + "hyper", + "hyper-rustls", + "lazy_static", + "pin-project-lite", + "rustls", + "tokio", + "tower", + "tracing", +] + +[[package]] +name = "aws-smithy-eventstream" +version = "0.55.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "460c8da5110835e3d9a717c61f5556b20d03c32a1dec57f8fc559b360f733bb8" +dependencies = [ + "aws-smithy-types", + "bytes", + "crc32fast", +] + +[[package]] +name = "aws-smithy-http" +version = "0.55.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2b3b693869133551f135e1f2c77cb0b8277d9e3e17feaf2213f735857c4f0d28" +dependencies = [ + "aws-smithy-eventstream", + "aws-smithy-types", + "bytes", + "bytes-utils", + "futures-core", + "http", + "http-body", + "hyper", + "once_cell", + "percent-encoding", + "pin-project-lite", + "pin-utils", + "tokio", + "tokio-util", + "tracing", +] + +[[package]] +name = "aws-smithy-http-tower" +version = "0.55.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3ae4f6c5798a247fac98a867698197d9ac22643596dc3777f0c76b91917616b9" +dependencies = [ + "aws-smithy-http", + "aws-smithy-types", + "bytes", + "http", + "http-body", + "pin-project-lite", + "tower", + "tracing", +] + +[[package]] +name = "aws-smithy-json" +version = "0.55.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "23f9f42fbfa96d095194a632fbac19f60077748eba536eb0b9fecc28659807f8" +dependencies = [ + "aws-smithy-types", +] + +[[package]] +name = "aws-smithy-query" +version = "0.55.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "98819eb0b04020a1c791903533b638534ae6c12e2aceda3e6e6fba015608d51d" +dependencies = [ + "aws-smithy-types", + "urlencoding", +] + +[[package]] +name = "aws-smithy-types" +version = "0.55.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "16a3d0bf4f324f4ef9793b86a1701d9700fbcdbd12a846da45eed104c634c6e8" +dependencies = [ + "base64-simd", + "itoa", + "num-integer", + "ryu", + "time 0.3.21", +] + +[[package]] +name = "aws-smithy-xml" +version = "0.55.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b1b9d12875731bd07e767be7baad95700c3137b56730ec9ddeedb52a5e5ca63b" +dependencies = [ + "xmlparser", +] + +[[package]] +name = "aws-types" +version = "0.55.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6dd209616cc8d7bfb82f87811a5c655dc97537f592689b18743bddf5dc5c4829" +dependencies = [ + "aws-credential-types", + "aws-smithy-async", + "aws-smithy-client", + "aws-smithy-http", + "aws-smithy-types", + "http", + "rustc_version", + "tracing", +] + [[package]] name = "axum" version = "0.6.18" @@ -376,6 +737,16 @@ version = "0.21.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "604178f6c5c21f02dc555784810edfb88d34ac2c73b2eae109655649ee73ce3d" +[[package]] +name = "base64-simd" +version = "0.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "339abbe78e73178762e23bea9dfd08e697eb3f3301cd4be981c0f78ba5859195" +dependencies = [ + "outref", + "vsimd", +] + [[package]] name = "base64ct" version = "1.6.0" @@ -467,6 +838,16 @@ version = "1.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "89b2fd2a0dcf38d7971e2194b6b6eebab45ae01067456a7fd93d5547a61b70be" +[[package]] +name = "bytes-utils" +version = "0.1.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e47d3a8076e283f3acd27400535992edb3ba4b5bb72f8891ad8fbe7932a7d4b9" +dependencies = [ + "bytes", + "either", +] + [[package]] name = "cap-fs-ext" version = "1.0.15" @@ -820,6 +1201,15 @@ dependencies = [ "wasmtime-types", ] +[[package]] +name = "crc32c" +version = "0.6.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d8f48d60e5b4d2c53d5c2b1d8a58c849a70ae5e5509b08a48d047e3b65714a74" +dependencies = [ + "rustc_version", +] + [[package]] name = "crc32fast" version = "1.3.2" @@ -1695,6 +2085,15 @@ dependencies = [ "itoa", ] +[[package]] +name = "http-auth" +version = "0.1.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5430cacd7a1f9a02fbeb350dfc81a0e5ed42d81f3398cb0ba184017f85bdcfbc" +dependencies = [ + "memchr", +] + [[package]] name = "http-body" version = "0.4.5" @@ -1754,6 +2153,39 @@ dependencies = [ "want", ] +[[package]] +name = "hyper-proxy" +version = "0.9.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ca815a891b24fdfb243fa3239c86154392b0953ee584aa1a2a1f66d20cbe75cc" +dependencies = [ + "bytes", + "futures", + "headers", + "http", + "hyper", + "hyper-tls", + "native-tls", + "tokio", + "tokio-native-tls", + "tower-service", +] + +[[package]] +name = "hyper-rustls" +version = "0.23.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1788965e61b367cd03a62950836d5cd41560c3577d90e40e0819373194d1661c" +dependencies = [ + "http", + "hyper", + "log", + "rustls", + "rustls-native-certs", + "tokio", + "tokio-rustls", +] + [[package]] name = "hyper-tls" version = "0.5.0" @@ -1943,6 +2375,21 @@ dependencies = [ "wasm-bindgen", ] +[[package]] +name = "jwt" +version = "0.16.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6204285f77fe7d9784db3fdc449ecce1a0114927a51d5a41c4c7a292011c015f" +dependencies = [ + "base64 0.13.1", + "crypto-common", + "digest", + "hmac", + "serde", + "serde_json", + "sha2", +] + [[package]] name = "keyring" version = "2.0.4" @@ -2369,6 +2816,41 @@ dependencies = [ "memchr", ] +[[package]] +name = "oci-distribution" +version = "0.10.0" +source = "git+https://github.com/devigned/oci-distribution?branch=os-wasi#c161f079c98f03a39e9c24127b554b6ceeb83fb3" +dependencies = [ + "bytes", + "chrono", + "futures-util", + "http", + "http-auth", + "jwt", + "lazy_static", + "olpc-cjson", + "regex", + "reqwest", + "serde", + "serde_json", + "sha2", + "thiserror", + "tokio", + "tracing", + "unicase", +] + +[[package]] +name = "olpc-cjson" +version = "0.1.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d637c9c15b639ccff597da8f4fa968300651ad2f1e968aefc3b4927a6fb2027a" +dependencies = [ + "serde", + "serde_json", + "unicode-normalization", +] + [[package]] name = "once_cell" version = "1.18.0" @@ -2447,6 +2929,12 @@ dependencies = [ "pin-project-lite", ] +[[package]] +name = "outref" +version = "0.5.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4030760ffd992bef45b0ae3f10ce1aba99e33464c90d14dd7c039884963ddc7a" + [[package]] name = "overload" version = "0.1.1" @@ -3097,6 +3585,21 @@ dependencies = [ "subtle", ] +[[package]] +name = "ring" +version = "0.16.20" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3053cf52e236a3ed746dfc745aa9cacf1b791d846bdaf412f60a8d7d6e17c8fc" +dependencies = [ + "cc", + "libc", + "once_cell", + "spin", + "untrusted", + "web-sys", + "winapi", +] + [[package]] name = "rpassword" version = "7.2.0" @@ -3130,6 +3633,15 @@ version = "1.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "08d43f7aa6b08d49f382cde6a7982047c3426db949b1424bc4b7ec9ae12c6ce2" +[[package]] +name = "rustc_version" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bfa0f585226d2e68097d4f95d113b15b83a82e819ab25717ec0590d9584ef366" +dependencies = [ + "semver", +] + [[package]] name = "rustix" version = "0.37.23" @@ -3159,6 +3671,39 @@ dependencies = [ "windows-sys 0.48.0", ] +[[package]] +name = "rustls" +version = "0.20.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fff78fc74d175294f4e83b28343315ffcfb114b156f0185e9741cb5570f50e2f" +dependencies = [ + "log", + "ring", + "sct", + "webpki", +] + +[[package]] +name = "rustls-native-certs" +version = "0.6.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a9aace74cb666635c918e9c12bc0d348266037aa8eb599b5cba565709a8dff00" +dependencies = [ + "openssl-probe", + "rustls-pemfile", + "schannel", + "security-framework", +] + +[[package]] +name = "rustls-pemfile" +version = "1.0.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2d3987094b1d07b653b7dfdc3f70ce9a1da9c51ac18c1b06b662e4f9a0e9f4b2" +dependencies = [ + "base64 0.21.2", +] + [[package]] name = "rustversion" version = "1.0.12" @@ -3205,6 +3750,16 @@ version = "1.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d29ab0c6d3fc0ee92fe66e2d99f700eab17a8d57d1c1d3b748380fb20baa78cd" +[[package]] +name = "sct" +version = "0.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d53dcdb7c9f8158937a7981b48accfd39a43af418591a5d008c7b22b5e1b7ca4" +dependencies = [ + "ring", + "untrusted", +] + [[package]] name = "sec1" version = "0.7.2" @@ -3510,6 +4065,12 @@ dependencies = [ "smallvec", ] +[[package]] +name = "spin" +version = "0.5.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6e63cff320ae2c57904679ba7cb63280a3dc4613885beafb148ee7bf9aa9042d" + [[package]] name = "spki" version = "0.7.2" @@ -3797,6 +4358,28 @@ dependencies = [ "tokio-util", ] +[[package]] +name = "tokio-rustls" +version = "0.23.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c43ee83903113e03984cb9e5cebe6c04a5116269e900e3ddba8f068a62adda59" +dependencies = [ + "rustls", + "tokio", + "webpki", +] + +[[package]] +name = "tokio-stream" +version = "0.1.14" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "397c988d37662c7dda6d2208364a706264bf3d6138b11d436cbac0ad38832842" +dependencies = [ + "futures-core", + "pin-project-lite", + "tokio", +] + [[package]] name = "tokio-util" version = "0.7.8" @@ -4034,6 +4617,12 @@ version = "0.2.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f962df74c8c05a667b5ee8bcf162993134c104e96440b663c8daa176dc772d8c" +[[package]] +name = "untrusted" +version = "0.7.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a156c684c91ea7d62626509bce3cb4e1d9ed5c4d978f7b4352658f96a4c26b4a" + [[package]] name = "url" version = "2.4.0" @@ -4045,6 +4634,12 @@ dependencies = [ "percent-encoding", ] +[[package]] +name = "urlencoding" +version = "2.1.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "daf8dba3b7eb870caf1ddeed7bc9d2a049f3cfdfae7cb521b087cc33ae4c49da" + [[package]] name = "utf8parse" version = "0.2.1" @@ -4075,6 +4670,12 @@ version = "0.9.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "49874b5167b65d7193b8aba1567f5c7d93d001cafc34600cee003eda787e483f" +[[package]] +name = "vsimd" +version = "0.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5c3082ca00d5a5ef149bb8b555a72ae84c9c59f7250f013ac822ac2e49b19c64" + [[package]] name = "waker-fn" version = "1.1.0" @@ -4245,6 +4846,10 @@ name = "warg-server" version = "0.1.0" dependencies = [ "anyhow", + "aws-config", + "aws-credential-types", + "aws-sdk-s3", + "aws-smithy-client", "axum", "bytes", "chrono", @@ -4255,7 +4860,10 @@ dependencies = [ "diesel_json", "diesel_migrations", "futures", + "hyper", + "hyper-proxy", "indexmap 2.0.0", + "oci-distribution", "secrecy", "serde", "serde_json", @@ -4832,6 +5440,16 @@ dependencies = [ "wasm-bindgen", ] +[[package]] +name = "webpki" +version = "0.22.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f095d78192e208183081cc07bc5515ef55216397af48b873e5edcd72637fa1bd" +dependencies = [ + "ring", + "untrusted", +] + [[package]] name = "which" version = "4.4.0" @@ -5187,6 +5805,12 @@ dependencies = [ "winapi", ] +[[package]] +name = "xmlparser" +version = "0.13.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4d25c75bf9ea12c4040a97f829154768bbbce366287e2dc044af160cd79a13fd" + [[package]] name = "yansi" version = "0.5.1" diff --git a/Cargo.toml b/Cargo.toml index 7b4fe5a6..d7cb2472 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -120,3 +120,9 @@ regex = "1" wasmparser = "0.108.0" protox = "0.4.1" toml = "0.7.6" +aws-config = "0.55.3" +aws-sdk-s3 = "0.28.0" +aws-credential-types = "0.55.3" +aws-smithy-client = "0.55.3" +hyper = "0.14.13" +hyper-proxy = "0.9.1" \ No newline at end of file diff --git a/crates/protocol/src/registry.rs b/crates/protocol/src/registry.rs index 0d69258e..ab7ca3ff 100644 --- a/crates/protocol/src/registry.rs +++ b/crates/protocol/src/registry.rs @@ -246,6 +246,12 @@ impl From for RecordId { } } +impl From for AnyHash { + fn from(id: RecordId) -> AnyHash { + id.0 + } +} + #[cfg(test)] mod tests { use super::*; diff --git a/crates/server/Cargo.toml b/crates/server/Cargo.toml index 8a1a3d80..92e015c5 100644 --- a/crates/server/Cargo.toml +++ b/crates/server/Cargo.toml @@ -19,7 +19,7 @@ tempfile = { workspace = true } tokio = { workspace = true } tokio-util = { workspace = true } tower = { workspace = true } -tower-http = { workspace = true, features = ["trace", "cors"]} +tower-http = { workspace = true, features = ["trace", "cors"] } tracing = { workspace = true } tracing-subscriber = { workspace = true } indexmap = { workspace = true } @@ -29,15 +29,24 @@ bytes = { workspace = true } wasmparser = { workspace = true } secrecy = { workspace = true } toml = { workspace = true } +serde_json = { workspace = true, optional = true } diesel = { workspace = true, features = ["postgres", "serde_json", "chrono"], optional = true } diesel-async = { workspace = true, features = ["postgres", "deadpool"], optional = true } -diesel_json = { workspace = true, optional = true} +diesel_json = { workspace = true, optional = true } diesel_migrations = { workspace = true, optional = true } diesel-derive-enum = { workspace = true, optional = true, features = ["postgres"] } -serde_json = { workspace = true, optional = true } chrono = { workspace = true, optional = true } +aws-config = { workspace = true, optional = true } +aws-sdk-s3 = { workspace = true, optional = true } +aws-credential-types = { workspace = true, optional = true } +aws-smithy-client = { workspace = true, optional = true } +hyper = { workspace = true, optional = true } +hyper-proxy = { workspace = true, optional = true } +oci-distribution = { git = "https://github.com/devigned/oci-distribution", branch = "os-wasi", optional = true } [features] default = [] debug = [] -postgres = ["diesel", "diesel-async", "diesel_json", "diesel_migrations", "diesel-derive-enum", "serde_json", "chrono"] +postgres = ["diesel", "diesel-async", "diesel_json", "diesel_migrations", "diesel-derive-enum", "chrono", "serde_json"] +s3 = ["aws-config", "aws-sdk-s3", "aws-credential-types", "aws-smithy-client", "hyper", "hyper-proxy"] +oci = ["oci-distribution", "serde_json"] diff --git a/crates/server/src/api/mod.rs b/crates/server/src/api/mod.rs index 72e779fb..c6f997b9 100644 --- a/crates/server/src/api/mod.rs +++ b/crates/server/src/api/mod.rs @@ -1,3 +1,4 @@ +use crate::contentstore::ContentStore; use crate::{ policy::{content::ContentPolicy, record::RecordPolicy}, services::CoreService, @@ -7,7 +8,6 @@ use std::{path::PathBuf, sync::Arc}; use tower::ServiceBuilder; use tower_http::{ cors::{Any, CorsLayer}, - services::ServeDir, trace::{DefaultMakeSpan, DefaultOnResponse, TraceLayer}, LatencyUnit, }; @@ -24,9 +24,9 @@ pub fn create_router( content_base_url: Url, core: CoreService, temp_dir: PathBuf, - files_dir: PathBuf, content_policy: Option>, record_policy: Option>, + content_store: Arc, ) -> Router { let router = Router::new(); #[cfg(feature = "debug")] @@ -38,12 +38,11 @@ pub fn create_router( content_base_url, core, temp_dir, - files_dir.clone(), content_policy, record_policy, + content_store, ), ) - .nest_service("/content", ServeDir::new(files_dir)) .layer( ServiceBuilder::new() .layer( diff --git a/crates/server/src/api/v1/mod.rs b/crates/server/src/api/v1/mod.rs index 349fc7b3..ecd95e7a 100644 --- a/crates/server/src/api/v1/mod.rs +++ b/crates/server/src/api/v1/mod.rs @@ -1,3 +1,4 @@ +use crate::contentstore::ContentStore; use crate::{ policy::{content::ContentPolicy, record::RecordPolicy}, services::CoreService, @@ -93,18 +94,18 @@ pub fn create_router( content_base_url: Url, core: CoreService, temp_dir: PathBuf, - files_dir: PathBuf, content_policy: Option>, record_policy: Option>, + content_store: Arc, ) -> Router { let proof_config = proof::Config::new(core.clone()); let package_config = package::Config::new( core.clone(), content_base_url, - files_dir, temp_dir, content_policy, record_policy, + content_store, ); let fetch_config = fetch::Config::new(core); diff --git a/crates/server/src/api/v1/package.rs b/crates/server/src/api/v1/package.rs index 8e0f2a5c..73037a9d 100644 --- a/crates/server/src/api/v1/package.rs +++ b/crates/server/src/api/v1/package.rs @@ -1,5 +1,6 @@ use super::{Json, Path}; use crate::{ + contentstore::ContentStore, datastore::{DataStoreError, RecordStatus}, policy::{ content::{ContentPolicy, ContentPolicyError}, @@ -7,6 +8,8 @@ use crate::{ }, services::CoreService, }; +use axum::body::StreamBody; +use axum::http::header; use axum::{ debug_handler, extract::{BodyStream, State}, @@ -16,10 +19,12 @@ use axum::{ Router, }; use futures::StreamExt; +use std::collections::HashSet; use std::sync::Arc; use std::{collections::HashMap, path::PathBuf}; use tempfile::NamedTempFile; use tokio::io::AsyncWriteExt; +use tokio_util::io::ReaderStream; use url::Url; use warg_api::v1::package::{ ContentSource, MissingContent, PackageError, PackageRecord, PackageRecordState, @@ -36,28 +41,28 @@ use warg_protocol::{ pub struct Config { core_service: CoreService, content_base_url: Url, - files_dir: PathBuf, temp_dir: PathBuf, content_policy: Option>, record_policy: Option>, + content_store: Arc, } impl Config { pub fn new( core_service: CoreService, content_base_url: Url, - files_dir: PathBuf, temp_dir: PathBuf, content_policy: Option>, record_policy: Option>, + content_store: Arc, ) -> Self { Self { core_service, content_base_url, - files_dir, temp_dir, content_policy, record_policy, + content_store, } } @@ -69,28 +74,18 @@ impl Config { "/:log_id/record/:record_id/content/:digest", post(upload_content), ) + .route( + "/:log_id/record/:record_id/content/:digest", + get(fetch_content), + ) .with_state(self) } - fn content_present(&self, digest: &AnyHash) -> bool { - self.content_path(digest).is_file() - } - - fn content_file_name(&self, digest: &AnyHash) -> String { - digest.to_string().replace(':', "-") - } - - fn content_path(&self, digest: &AnyHash) -> PathBuf { - self.files_dir.join(self.content_file_name(digest)) - } - - fn content_url(&self, digest: &AnyHash) -> String { - self.content_base_url - .join("content/") - .unwrap() - .join(&self.content_file_name(digest)) - .unwrap() - .to_string() + fn content_url(&self, log_id: &LogId, record_id: &RecordId, digest: &AnyHash) -> String { + format!( + "{url}v1/package/{log_id}/record/{record_id}/content/{digest}", + url = self.content_base_url, + ) } fn build_missing_content<'a>( @@ -138,6 +133,13 @@ impl PackageApiError { message: message.to_string(), }) } + + fn not_found(message: impl ToString) -> Self { + Self(PackageError::Message { + status: StatusCode::NOT_FOUND.as_u16(), + message: message.to_string(), + }) + } } impl From for PackageApiError { @@ -227,8 +229,25 @@ async fn publish_record( .await?; let record_id = RecordId::package_record::(&record); - let mut missing = record.as_ref().contents(); - missing.retain(|d| !config.content_present(d)); + let mut missing = HashSet::<&AnyHash>::new(); + let version = crate::datastore::get_version_for_release(record.as_ref()); + for key in record.as_ref().contents() { + match version { + Some(version) => { + if !config + .content_store + .content_present(&body.id, key, version.to_string()) + .await + .map_err(PackageApiError::internal_error)? + { + missing.insert(key); + } + } + None => { + missing.insert(key); + } + } + } config .core_service @@ -240,7 +259,7 @@ async fn publish_record( if missing.is_empty() { config .core_service - .submit_package_record(log_id, record_id.clone()) + .submit_package_record(log_id.clone(), record_id.clone()) .await; return Ok(( @@ -300,7 +319,7 @@ async fn get_record( ( digest.clone(), vec![ContentSource::Http { - url: config.content_url(digest), + url: config.content_url(&log_id, &record_id, digest), }], ) }) @@ -371,8 +390,18 @@ async fn upload_content( // Only persist the file if the content was successfully processed res?; - tmp_path - .persist(config.content_path(&digest)) + let version = + crate::datastore::get_release_version(config.core_service.store(), &log_id, &record_id) + .await?; + let package_id = config.core_service.store().get_package_id(&log_id).await?; + let mut tmp_file = tokio::fs::File::open(&tmp_path) + .await + .map_err(PackageApiError::internal_error)?; + + config + .content_store + .store_content(&package_id, &digest, version.to_string(), &mut tmp_file) + .await .map_err(PackageApiError::internal_error)?; // If this is the last content needed, submit the record for processing now @@ -384,13 +413,16 @@ async fn upload_content( { config .core_service - .submit_package_record(log_id, record_id) + .submit_package_record(log_id.clone(), record_id.clone()) .await; } Ok(( StatusCode::CREATED, - [(axum::http::header::LOCATION, config.content_url(&digest))], + [( + header::LOCATION, + config.content_url(&log_id, &record_id, &digest), + )], )) } @@ -437,3 +469,31 @@ async fn process_content( Ok(()) } + +#[debug_handler] +async fn fetch_content( + State(config): State, + Path((log_id, record_id, digest)): Path<(LogId, RecordId, AnyHash)>, +) -> Result { + tracing::info!("fetching content for record `{record_id}` from `{log_id}`"); + + let package_id = config.core_service.store().get_package_id(&log_id).await?; + let version = + crate::datastore::get_release_version(config.core_service.store(), &log_id, &record_id) + .await?; + let file = config + .content_store + .fetch_content(&package_id, &digest, version.to_string()) + .await + .map_err(PackageApiError::not_found)?; + + let stream = ReaderStream::new(file); + let body = StreamBody::new(stream); + let disposition = &format!("attachment; filename=\"{digest}\"", digest = digest.clone()); + let headers = [ + (header::CONTENT_TYPE, "application/wasm; charset=utf-8"), + (header::CONTENT_DISPOSITION, &String::from(disposition)), + ]; + + Ok((headers, body).into_response()) +} diff --git a/crates/server/src/bin/warg-server.rs b/crates/server/src/bin/warg-server.rs index a073aea5..457da72a 100644 --- a/crates/server/src/bin/warg-server.rs +++ b/crates/server/src/bin/warg-server.rs @@ -16,6 +16,17 @@ enum DataStoreKind { Memory, } +#[derive(ValueEnum, Debug, Clone, Copy, PartialEq, Eq, Default)] +enum ContentStoreKind { + #[default] + Local, + #[cfg(feature = "oci")] + #[value(alias("ociv1-1"))] + OCIv1_1, + #[cfg(feature = "s3")] + S3, +} + #[derive(Parser, Debug)] struct Args { /// Use verbose output @@ -38,6 +49,15 @@ struct Args { #[arg(long, env = "WARG_DATA_STORE", default_value = "memory")] data_store: DataStoreKind, + /// The content store to use for the server. + #[arg(long, env = "WARG_CONTENT_STORE", default_value = "local")] + content_store: ContentStoreKind, + + /// The OCI registry URL if content store is set to oci. + #[cfg(feature = "oci")] + #[arg(long, env = "WARG_OCI_REGISTRY_URL", default_value = "localhost:5000")] + oci_registry_url: Option, + /// The database connection URL if data-store is set to postgres. /// /// Prefer using `database-url-file`, or environment variable variation, @@ -56,6 +76,36 @@ struct Args { #[arg(long)] database_run_migrations: bool, + /// The S3 compatible endpoint. + #[cfg(feature = "s3")] + #[arg(long, env = "WARG_S3_ENDPOINT")] + s3_endpoint: Option, + + /// The S3 compatible API key secret. + #[cfg(feature = "s3")] + #[arg(long, env = "WARG_S3_API_KEY_ID")] + s3_api_key_id: Option, + + /// The S3 compatible API key secret. + #[cfg(feature = "s3")] + #[arg(long, env = "WARG_S3_API_KEY_SECRET")] + s3_api_key_secret: Option, + + /// The S3 compatible region. + #[cfg(feature = "s3")] + #[arg(long, env = "WARG_S3_REGION", default_value = "auto")] + s3_region: Option, + + /// The S3 compatible region. + #[cfg(feature = "s3")] + #[arg(long, env = "WARG_S3_BUCKET_NAME", default_value = "warg-registry")] + s3_bucket_name: Option, + + /// The S3 compatible presign time to live in u64 seconds. + #[cfg(feature = "s3")] + #[arg(long, env = "WARG_S3_PRESIGN_TTL", default_value = "3600")] + s3_presign_ttl: Option, + /// The operator key. /// /// Prefer using `operator-key-file`, or environment variable variation. @@ -95,7 +145,7 @@ async fn main() -> Result<()> { let operator_key = PrivateKey::decode(operator_key_str).context("failed to parse operator key")?; - let mut config = Config::new(operator_key, args.content_dir) + let mut config = Config::new(operator_key, args.content_dir.clone()) .with_addr(args.listen) .with_shutdown(shutdown_signal()); @@ -111,6 +161,64 @@ async fn main() -> Result<()> { config = config.with_record_policy(authorized_key_policy); } + let config = match args.content_store { + ContentStoreKind::Local => { + tracing::info!("using local content store"); + config + } + #[cfg(feature = "oci")] + ContentStoreKind::OCIv1_1 => { + use oci_distribution::secrets::RegistryAuth::Anonymous; + use warg_server::contentstore::oci::ociv1_1::OCIv1_1ContentStore; + tracing::info!("using OCIv1.1 content store"); + config.with_content_store( + OCIv1_1ContentStore::new( + args.oci_registry_url.unwrap(), + Anonymous, + &args.content_dir, + ) + .await, + ) + } + #[cfg(feature = "s3")] + ContentStoreKind::S3 => { + use warg_server::contentstore::s3::S3ContentStore; + tracing::info!("using s3 content store"); + config.with_content_store( + S3ContentStore::new( + args.s3_endpoint + .with_context(|| "must specify the s3 compatible endpoint: --s3-endpoint") + .unwrap(), + args.s3_api_key_id + .with_context(|| { + "must specify the s3 compatible API key ID: --s3-api-key-id" + }) + .unwrap(), + args.s3_api_key_secret + .with_context(|| { + "must specify the s3 compatible API key secret: --s3-api-key-secret" + }) + .unwrap(), + args.s3_region + .with_context(|| "must specify the s3 compatible region: --s3-region") + .unwrap(), + args.s3_bucket_name + .with_context(|| { + "must specify the s3 compatible bucket name: --s3-bucket-name" + }) + .unwrap(), + &args.content_dir, + args.s3_presign_ttl + .with_context(|| { + "must specify the s3 compatible presign time to live: --s3-presign-ttl" + }) + .unwrap(), + ) + .await, + ) + } + }; + let config = match args.data_store { #[cfg(feature = "postgres")] DataStoreKind::Postgres => { diff --git a/crates/server/src/contentstore/local.rs b/crates/server/src/contentstore/local.rs new file mode 100644 index 00000000..7eb59ae0 --- /dev/null +++ b/crates/server/src/contentstore/local.rs @@ -0,0 +1,74 @@ +use crate::contentstore::{ContentStore, ContentStoreError}; +use std::path::{Path, PathBuf}; +use tokio::fs::File; +use tokio::io::copy; +use warg_crypto::hash::AnyHash; +use warg_protocol::registry::PackageId; + +#[derive(Clone)] +pub struct LocalContentStore { + files_dir: PathBuf, +} + +impl LocalContentStore { + pub fn new(files_dir: PathBuf) -> Self { + Self { files_dir } + } + + /// Returns the path to the content file for a given content address. + fn content_path(&self, digest: &AnyHash) -> PathBuf { + self.files_dir.join(content_file_name(digest)) + } +} + +/// Returns the file name for a given content address replacing colons with dashes. +fn content_file_name(digest: &AnyHash) -> String { + digest.to_string().replace(':', "-") +} + +#[axum::async_trait] +impl ContentStore for LocalContentStore { + /// Fetch content for a given package. + async fn fetch_content( + &self, + _package_id: &PackageId, + digest: &AnyHash, + _version: String, + ) -> Result { + File::open(self.content_path(digest)) + .await + .map_err(|e| ContentStoreError::ContentStoreInternalError(e.to_string())) + } + + /// Store content for a given package. + async fn store_content( + &self, + _package_id: &PackageId, + digest: &AnyHash, + _version: String, + content: &mut File, + ) -> Result { + let file_path = self.content_path(digest); + let mut stored_file = File::create(file_path.clone()) + .await + .map_err(|e| ContentStoreError::ContentStoreInternalError(e.to_string()))?; + + copy(content, &mut stored_file) + .await + .map_err(|e| ContentStoreError::ContentStoreInternalError(e.to_string()))?; + Ok(file_path.to_string_lossy().to_string()) + } + + /// Check if the content is present in the store. + async fn content_present( + &self, + _package_id: &PackageId, + digest: &AnyHash, + _version: String, + ) -> Result { + let path = self.content_path(digest); + Path::new(&path) + .try_exists() + .map_err(|e| ContentStoreError::ContentStoreInternalError(e.to_string())) + } +} diff --git a/crates/server/src/contentstore/mod.rs b/crates/server/src/contentstore/mod.rs new file mode 100644 index 00000000..204e9ed9 --- /dev/null +++ b/crates/server/src/contentstore/mod.rs @@ -0,0 +1,74 @@ +use hyper::Uri; +use thiserror::Error; +use tokio::fs::File; +use warg_crypto::hash::AnyHash; +use warg_protocol::registry::PackageId; + +pub mod local; +#[cfg(feature = "oci")] +pub mod oci; +#[cfg(feature = "s3")] +pub mod s3; + +#[derive(Debug, Error)] +pub enum ContentStoreError { + #[error("content with address `{0}` was not found")] + ContentNotFound(AnyHash), + + #[error("content store internal error: {0}")] + ContentStoreInternalError(String), +} + +pub enum ContentStoreUriSigning { + None, + Presigned(Box), +} + +/// Implemented by content stores that support presigned URIs. +#[axum::async_trait] +pub trait PresignedContentStore { + async fn read_uri( + &self, + package_id: &PackageId, + digest: &AnyHash, + version: String, + ) -> Result; + async fn write_uri( + &self, + package_id: &PackageId, + digest: &AnyHash, + version: String, + ) -> Result; +} + +/// Implemented by content stores. +#[axum::async_trait] +pub trait ContentStore: Send + Sync { + /// Fetch content for a given package. + async fn fetch_content( + &self, + package_id: &PackageId, + digest: &AnyHash, + version: String, + ) -> Result; + + /// Store content for a given package. + async fn store_content( + &self, + package_id: &PackageId, + digest: &AnyHash, + version: String, + content: &mut File, + ) -> Result; + + async fn content_present( + &self, + package_id: &PackageId, + digest: &AnyHash, + version: String, + ) -> Result; + + async fn uri_signing(&self) -> ContentStoreUriSigning { + ContentStoreUriSigning::None + } +} diff --git a/crates/server/src/contentstore/oci/client.rs b/crates/server/src/contentstore/oci/client.rs new file mode 100644 index 00000000..ff992f3d --- /dev/null +++ b/crates/server/src/contentstore/oci/client.rs @@ -0,0 +1,206 @@ +use std::path::{Path, PathBuf}; +use std::sync::Arc; + +use anyhow::{Context, Result}; +use oci_distribution::config::{Architecture, Config as DistConfig, ConfigFile, Os}; +use oci_distribution::{ + client, + client::{ClientProtocol, Config, ImageLayer}, + manifest::OciImageManifest, + secrets::RegistryAuth, + Reference, +}; +use serde_json; +use tokio::fs::File; +use tokio::io::{AsyncReadExt, AsyncWriteExt}; +use tokio::runtime::Handle; +use tokio::sync::RwLock; +use tokio::task::block_in_place; + +use warg_crypto::hash::AnyHash; + +use crate::{ + contentstore::ContentStoreError, contentstore::ContentStoreError::ContentStoreInternalError, +}; + +const COMPONENT_ARTIFACT_TYPE: &str = "application/vnd.bytecodealliance.component.v1+wasm"; +const WASM_LAYER_MEDIA_TYPE: &str = "application/vnd.bytecodealliance.wasm.component.layer.v0+wasm"; +// const COMPONENT_COMPOSE_MANIFEST_MEDIA_TYPE: &str = "application/vnd.bytecodealliance.component.compose.v0+yaml"; + +/// Client for interacting with an OCI registry +pub struct Client { + oci_client: Arc>, + auth: RegistryAuth, + temp_dir: PathBuf, +} + +impl Client { + /// Create a new instance of an OCI client for storing components. + pub async fn new(insecure: bool, auth: RegistryAuth, temp_dir: &PathBuf) -> Self { + let client = oci_distribution::Client::new(Self::build_config(insecure)); + Self { + oci_client: Arc::new(RwLock::new(client)), + auth: auth.into(), + temp_dir: temp_dir.clone(), + } + } + + pub async fn pull( + &self, + reference: impl AsRef, + digest: &AnyHash, + ) -> Result { + let path = self.cached_content_path(digest); + if Path::new(&path) + .try_exists() + .map_err(|e| ContentStoreError::ContentStoreInternalError(e.to_string()))? + { + let file = File::open(path) + .await + .map_err(|e| ContentStoreError::ContentStoreInternalError(e.to_string()))?; + return Ok(file); + } + + let reference: Reference = reference + .as_ref() + .parse() + .with_context(|| format!("cannot parse reference {}", reference.as_ref())) + .map_err(|e| ContentStoreInternalError(e.to_string()))?; + + // TODO: fix the higher-level lifetime error that occurs when not using block_in_place and + // block_on. + let result = block_in_place(|| { + Handle::current().block_on(async move { + let mut oci = self.oci_client.write().await; + oci.pull(&reference, &self.auth, vec![WASM_LAYER_MEDIA_TYPE]) + .await + }) + }); + + let image = result.map_err(|e| ContentStoreInternalError(e.to_string()))?; + + let layer = image + .layers + .into_iter() + .find(|l| l.sha256_digest() == digest.to_string()) + .ok_or(ContentStoreInternalError("layer not found".to_string()))?; + let mut file = File::create(self.cached_content_path(digest)) + .await + .map_err(|e| ContentStoreInternalError(e.to_string()))?; + file.write_all(&layer.data) + .await + .map_err(|e| ContentStoreInternalError(e.to_string()))?; + Ok(file) + } + + /// Push a component to an OCI registry. + pub async fn push( + &self, + reference: impl AsRef, + file: &mut File, + digest: &AnyHash, + ) -> Result { + let reference: Reference = reference + .as_ref() + .parse() + .with_context(|| format!("cannot parse reference {}", reference.as_ref())) + .map_err(|e| ContentStoreInternalError(e.to_string()))?; + + let entrypoint = format!("/{}", digest.to_string().strip_prefix("sha256:").unwrap()); + let config = ConfigFile { + architecture: Architecture::Wasm, + os: Os::Wasi, + config: Some(DistConfig { + // use the sha256 hash as the file name for the entrypoint + entrypoint: vec![entrypoint], + ..Default::default() + }), + ..Default::default() + }; + let config_data = + serde_json::to_vec(&config).map_err(|e| ContentStoreInternalError(e.to_string()))?; + let oci_config = Config::oci_v1(config_data, None); + let mut layers = Vec::new(); + let wasm_layer = Self::wasm_layer(file) + .await + .context("cannot create wasm layer") + .map_err(|e| ContentStoreInternalError(e.to_string()))?; + layers.insert(0, wasm_layer); + let mut manifest = OciImageManifest::build(&layers, &oci_config, None); + manifest.artifact_type = Some(COMPONENT_ARTIFACT_TYPE.to_string()); + + // TODO: fix the higher-level lifetime error that occurs when not using block_in_place and + // block_on. + let result = block_in_place(|| { + Handle::current().block_on(async move { + tracing::log::trace!("Pushing component to {:?}", reference); + let mut oci = self.oci_client.write().await; + oci.push(&reference, &layers, oci_config, &self.auth, Some(manifest)) + .await + }) + }); + + result + .map(|push_response| push_response.manifest_url) + .context("cannot push component to the registry") + .map_err(|e| ContentStoreInternalError(e.to_string())) + } + + pub async fn content_exists( + &self, + reference: impl AsRef, + ) -> Result { + let reference: Reference = reference + .as_ref() + .parse() + .with_context(|| format!("cannot parse reference {}", reference.as_ref())) + .map_err(|e| ContentStoreInternalError(e.to_string())) + .unwrap(); + + let mut oci = self.oci_client.write().await; + match oci.fetch_manifest_digest(&reference, &self.auth).await { + Ok(_) => Ok(true), + Err(_) => Ok(false), + } + } + + /// Create a new wasm layer based on a file. + async fn wasm_layer(file: &mut File) -> Result { + tracing::log::trace!("Reading wasm component from {:?}", file); + + let mut contents = vec![]; + file.read_to_end(&mut contents) + .await + .context("cannot read wasm component")?; + + Ok(ImageLayer::new( + contents, + WASM_LAYER_MEDIA_TYPE.to_string(), + None, + )) + } + + /// Returns the path to the content file for a given content address. + fn cached_content_path(&self, digest: &AnyHash) -> PathBuf { + self.temp_dir.join(Self::content_file_name(digest)) + } + + /// Returns the file name for a given content address replacing colons with dashes. + fn content_file_name(digest: &AnyHash) -> String { + digest.to_string().replace(':', "-") + } + + /// Build the OCI client configuration given the insecure option. + fn build_config(insecure: bool) -> client::ClientConfig { + let protocol = if insecure { + ClientProtocol::Http + } else { + ClientProtocol::Https + }; + + client::ClientConfig { + protocol, + ..Default::default() + } + } +} diff --git a/crates/server/src/contentstore/oci/mod.rs b/crates/server/src/contentstore/oci/mod.rs new file mode 100644 index 00000000..8cf00ceb --- /dev/null +++ b/crates/server/src/contentstore/oci/mod.rs @@ -0,0 +1,2 @@ +mod client; +pub mod ociv1_1; diff --git a/crates/server/src/contentstore/oci/ociv1_1.rs b/crates/server/src/contentstore/oci/ociv1_1.rs new file mode 100644 index 00000000..12183bd1 --- /dev/null +++ b/crates/server/src/contentstore/oci/ociv1_1.rs @@ -0,0 +1,71 @@ +use crate::contentstore::oci::client::Client; +use crate::contentstore::{ContentStore, ContentStoreError}; +use oci_distribution::secrets::RegistryAuth; +use std::path::PathBuf; +use tokio::fs::File; +use warg_crypto::hash::AnyHash; +use warg_protocol::registry::PackageId; + +type Auth = RegistryAuth; + +/// Content store for OCI v1.1 registries. +pub struct OCIv1_1ContentStore { + client: Client, + registry_url: String, +} + +impl OCIv1_1ContentStore { + pub async fn new(registry_url: impl Into, auth: Auth, temp_dir: &PathBuf) -> Self { + let client = Client::new(true, auth, temp_dir).await; + Self { + client, + registry_url: registry_url.into(), + } + } + + fn reference(&self, package_id: &PackageId, version: String) -> String { + let (reg_url, namespace, name) = ( + self.registry_url.clone(), + package_id.namespace(), + package_id.name(), + ); + format!("{reg_url}/{namespace}/{name}:{version}") + } +} + +#[axum::async_trait] +impl ContentStore for OCIv1_1ContentStore { + /// Fetch the content from the store. + async fn fetch_content( + &self, + package_id: &PackageId, + digest: &AnyHash, + version: String, + ) -> Result { + let reference = self.reference(package_id, version); + self.client.pull(reference, digest).await + } + + /// Store the content in the store. + async fn store_content( + &self, + package_id: &PackageId, + digest: &AnyHash, + version: String, + content: &mut File, + ) -> Result { + let reference = self.reference(package_id, version); + self.client.push(reference, content, digest).await + } + + /// Check if the content is present in the store. + async fn content_present( + &self, + package_id: &PackageId, + _digest: &AnyHash, + version: String, + ) -> Result { + let reference = self.reference(package_id, version); + self.client.content_exists(reference).await + } +} diff --git a/crates/server/src/contentstore/s3/mod.rs b/crates/server/src/contentstore/s3/mod.rs new file mode 100644 index 00000000..8c8112d5 --- /dev/null +++ b/crates/server/src/contentstore/s3/mod.rs @@ -0,0 +1,288 @@ +use crate::contentstore::ContentStoreError::ContentStoreInternalError; +use crate::contentstore::{ + ContentStore, ContentStoreError, ContentStoreUriSigning, PresignedContentStore, +}; +use aws_credential_types::Credentials; +use aws_sdk_s3; +use aws_sdk_s3::config::Region; +use aws_sdk_s3::presigning::PresigningConfig; +use aws_sdk_s3::primitives::ByteStream; +use axum::http::HeaderValue; +use futures::TryStreamExt; +use hyper::client::HttpConnector; +use hyper::Uri; +use hyper_proxy::{Intercept, Proxy, ProxyConnector}; +use secrecy::{ExposeSecret, SecretString}; +use std::env; +use std::path::{Path, PathBuf}; +use std::str::FromStr; +use std::time::Duration; +use tokio::fs::File; +use tokio::io::{AsyncReadExt, AsyncWriteExt}; +use tracing::info; +use url::Url; +use warg_crypto::hash::AnyHash; +use warg_protocol::registry::PackageId; + +#[derive(Debug, Clone)] +pub struct S3ContentStore { + client: aws_sdk_s3::Client, + // on R2 there is a max number of buckets per account (default: 1000) + bucket_name: String, + presign_ttl: u64, + temp_dir: PathBuf, +} + +impl S3ContentStore { + pub async fn new( + endpoint: Url, + access_key_id: SecretString, + access_key_secret: SecretString, + region: String, + bucket_name: String, + temp_dir: &PathBuf, + presign_ttl: u64, + ) -> Self { + let creds = Credentials::new( + access_key_id.expose_secret().to_string(), + access_key_secret.expose_secret().to_string(), + None, + None, + "warg-s3-static-provider", + ); + let config_builder = aws_config::from_env() + .region(Region::new(region.clone())) + .endpoint_url(endpoint.clone()) + .credentials_provider(creds); + let config = match env::var("HTTP_PROXY") { + Ok(proxy_env) => { + let proxy_str = proxy_env.as_str(); + let proxy_uri = Uri::from_str(proxy_str).unwrap(); + let proxy_connector = { + let proxy = Proxy::new(Intercept::All, proxy_uri.clone()); + let connector = HttpConnector::new(); + ProxyConnector::from_proxy(connector, proxy).unwrap() + }; + // need to ensure the `hyper` feature of smithy-client is enabled + let hyper_client = + aws_smithy_client::hyper_ext::Adapter::builder().build(proxy_connector); + info!("Using proxy {}", proxy_str); + aws_sdk_s3::config::Builder::from(&config_builder.load().await) + .http_connector(hyper_client) + .build() + } + _ => (&config_builder.load().await).into(), + }; + let client = aws_sdk_s3::Client::from_conf(config); + Self { + client, + bucket_name, + temp_dir: temp_dir.clone(), + presign_ttl, + } + } + + /// Returns the path to the content file for a given content address. + fn cached_content_path(&self, digest: &AnyHash) -> PathBuf { + self.temp_dir.join(Self::content_file_name(digest)) + } + + /// Returns the file name for a given content address replacing colons with dashes. + fn content_file_name(digest: &AnyHash) -> String { + digest.to_string().replace(':', "-") + } + + fn content_store_name(package_id: &PackageId, version: &str, digest: &AnyHash) -> String { + format!( + "{}-{}-{}-{}", + package_id.namespace(), + package_id.name(), + version, + Self::content_file_name(digest) + ) + } + + fn get_presign_config(&self) -> Result { + Ok(PresigningConfig::builder() + .expires_in(Duration::from_secs(self.presign_ttl)) + .build() + .map_err(|e| { + ContentStoreInternalError(format!("cannot build presigning config: {}", e)) + })?) + } +} + +#[axum::async_trait] +impl ContentStore for S3ContentStore { + async fn fetch_content( + &self, + package_id: &PackageId, + digest: &AnyHash, + version: String, + ) -> Result { + let path = self.cached_content_path(digest); + if Path::new(&path) + .try_exists() + .map_err(|e| ContentStoreInternalError(e.to_string()))? + { + let file = File::open(path) + .await + .map_err(|e| ContentStoreInternalError(e.to_string()))?; + return Ok(file); + } + + let mut object = self + .client + .get_object() + .bucket(self.bucket_name.clone()) + .key(S3ContentStore::content_store_name( + package_id, &version, digest, + )) + .send() + .await + .map_err(|e| ContentStoreInternalError(e.to_string()))?; + + let mut file = File::create(path) + .await + .map_err(|e| ContentStoreInternalError(e.to_string()))?; + + while let Some(bits) = object + .body + .try_next() + .await + .map_err(|e| ContentStoreInternalError(e.to_string()))? + { + file.write_all(&bits) + .await + .map_err(|e| ContentStoreInternalError(e.to_string()))?; + } + Ok(file) + } + + async fn store_content( + &self, + package_id: &PackageId, + digest: &AnyHash, + version: String, + content: &mut File, + ) -> Result { + let mut buf: Vec = Vec::new(); + content.read_to_end(&mut buf).await.map_err(|e| { + ContentStoreInternalError(format!( + "cannot read content for package {} version {}: {}", + package_id, + version.clone(), + e + )) + })?; + + self.client + .put_object() + .bucket(self.bucket_name.clone()) + .body(ByteStream::from(buf)) + .key(S3ContentStore::content_store_name( + package_id, &version, digest, + )) + .content_type("application/wasm".to_string()) + .customize() + .await + .map_err(|e| { + ContentStoreInternalError(format!( + "cannot customize request for package {} version {}: {}", + package_id, + version.clone(), + e + )) + })? + // add a header so that Cloudflare will automatically create the bucket if it doesn't exist + .mutate_request(|req| { + req.headers_mut().insert( + "cf-create-bucket-if-missing", + HeaderValue::from_static("true"), + ); + }) + .send() + .await + .map_err(|e| { + ContentStoreInternalError(format!( + "cannot store content for package {} version {}: {}", + package_id, version, e + )) + })?; + + Ok(digest.to_string()) + } + + async fn content_present( + &self, + package_id: &PackageId, + digest: &AnyHash, + version: String, + ) -> Result { + self.client + .head_object() + .bucket(self.bucket_name.clone()) + .key(S3ContentStore::content_store_name( + package_id, &version, digest, + )) + .send() + .await + .map(|_| true) + .or_else(|_| Ok(false)) + } + + async fn uri_signing(&self) -> ContentStoreUriSigning { + ContentStoreUriSigning::Presigned(Box::new(self.clone())) + } +} + +#[axum::async_trait] +impl PresignedContentStore for S3ContentStore { + async fn read_uri( + &self, + package_id: &PackageId, + digest: &AnyHash, + version: String, + ) -> Result { + let config = self.get_presign_config()?; + + let presigned = self + .client + .get_object() + .bucket(self.bucket_name.clone()) + .key(S3ContentStore::content_store_name( + package_id, &version, digest, + )) + .presigned(config) + .await + .map_err(|e| { + ContentStoreInternalError(format!("cannot generate presigned Uri: {e}")) + })?; + + Ok(presigned.uri().clone()) + } + + async fn write_uri( + &self, + package_id: &PackageId, + digest: &AnyHash, + version: String, + ) -> Result { + let config = self.get_presign_config()?; + + let presigned = self + .client + .put_object() + .bucket(self.bucket_name.clone()) + .key(S3ContentStore::content_store_name( + package_id, &version, digest, + )) + .presigned(config) + .await + .map_err(|e| { + ContentStoreInternalError(format!("cannot generate presigned Uri: {e}")) + })?; + + Ok(presigned.uri().clone()) + } +} diff --git a/crates/server/src/datastore/memory.rs b/crates/server/src/datastore/memory.rs index 4f229f95..3dee712d 100644 --- a/crates/server/src/datastore/memory.rs +++ b/crates/server/src/datastore/memory.rs @@ -78,6 +78,7 @@ struct State { package_ids: BTreeSet, checkpoints: IndexMap>, records: HashMap>, + names: HashMap, } /// Represents an in-memory data store. @@ -232,6 +233,10 @@ impl DataStore for MemoryDataStore { }); let mut state = self.0.write().await; + state + .names + .entry(log_id.clone()) + .or_insert_with(|| package_id.clone()); let prev = state.records.entry(log_id.clone()).or_default().insert( record_id.clone(), RecordStatus::Pending(PendingRecord::Package { @@ -590,6 +595,16 @@ impl DataStore for MemoryDataStore { }) } + async fn get_package_id(&self, log_id: &LogId) -> Result { + self.0 + .read() + .await + .names + .get(log_id) + .map(|package_id| package_id.clone()) + .ok_or_else(|| DataStoreError::LogNotFound(log_id.clone())) + } + async fn verify_package_record_signature( &self, log_id: &LogId, diff --git a/crates/server/src/datastore/mod.rs b/crates/server/src/datastore/mod.rs index fe92797a..e9e9d570 100644 --- a/crates/server/src/datastore/mod.rs +++ b/crates/server/src/datastore/mod.rs @@ -1,21 +1,24 @@ -use futures::Stream; use std::{collections::HashSet, pin::Pin}; + +use futures::Stream; use thiserror::Error; + +pub use memory::*; +#[cfg(feature = "postgres")] +pub use postgres::*; use warg_crypto::{hash::AnyHash, signing::KeyID}; +use warg_protocol::package::PackageEntry; use warg_protocol::{ operator, package, registry::{LogId, LogLeaf, MapCheckpoint, PackageId, RecordId}, - ProtoEnvelope, SerdeEnvelope, + ProtoEnvelope, SerdeEnvelope, Version, }; +use PackageEntry::Release; mod memory; #[cfg(feature = "postgres")] mod postgres; -pub use memory::*; -#[cfg(feature = "postgres")] -pub use postgres::*; - #[derive(Debug, Error)] pub enum DataStoreError { #[error("a conflicting operation was processed: update to the latest checkpoint and try the operation again")] @@ -231,6 +234,9 @@ pub trait DataStore: Send + Sync { limit: u16, ) -> Result>, DataStoreError>; + /// Gets the package ID for the given log ID. + async fn get_package_id(&self, log_id: &LogId) -> Result; + /// Gets an operator record. async fn get_operator_record( &self, @@ -264,3 +270,24 @@ pub trait DataStore: Send + Sync { anyhow::bail!("not implemented") } } + +pub fn get_version_for_release(record: &package::PackageRecord) -> Option<&Version> { + record.entries.iter().find_map(|entry| match entry { + Release { + version, + content: _, + } => Some(version), + _ => None, + }) +} + +pub async fn get_release_version( + data_store: &dyn DataStore, + log_id: &LogId, + record_id: &RecordId, +) -> Result { + let record = data_store.get_package_record(&log_id, &record_id).await?; + get_version_for_release(&record.envelope.as_ref()) + .cloned() + .ok_or_else(|| DataStoreError::RecordNotFound(record_id.clone())) +} diff --git a/crates/server/src/datastore/postgres/mod.rs b/crates/server/src/datastore/postgres/mod.rs index 30163ab8..4eec3d59 100644 --- a/crates/server/src/datastore/postgres/mod.rs +++ b/crates/server/src/datastore/postgres/mod.rs @@ -747,6 +747,20 @@ impl DataStore for PostgresDataStore { get_records(&mut conn, log_id, checkpoint_id, since, limit as i64).await } + async fn get_package_id( + &self, + log_id: &LogId, + ) -> std::result::Result { + let mut conn = self.pool.get().await?; + schema::logs::table + .select(schema::logs::name) + .filter(schema::logs::log_id.eq(TextRef(log_id))) + .first::>(conn.as_mut()) + .await + .map(|name| PackageId::new(name.unwrap()).unwrap()) + .map_err(|_| DataStoreError::LogNotFound(log_id.clone())) + } + async fn get_operator_record( &self, log_id: &LogId, diff --git a/crates/server/src/datastore/postgres/models.rs b/crates/server/src/datastore/postgres/models.rs index f2b5fbe0..ae1a85ae 100644 --- a/crates/server/src/datastore/postgres/models.rs +++ b/crates/server/src/datastore/postgres/models.rs @@ -58,7 +58,7 @@ impl<'a, T: std::fmt::Debug + Display> ToSql for TextRef<'a } } -#[derive(Insertable)] +#[derive(Insertable, Queryable, Selectable)] #[diesel(table_name = logs)] pub struct NewLog<'a, V> where diff --git a/crates/server/src/lib.rs b/crates/server/src/lib.rs index d0ad90f0..557a47af 100644 --- a/crates/server/src/lib.rs +++ b/crates/server/src/lib.rs @@ -1,6 +1,8 @@ +use crate::contentstore::local::LocalContentStore; use crate::{api::create_router, datastore::MemoryDataStore}; use anyhow::{Context, Result}; use axum::Router; +use contentstore::ContentStore; use datastore::DataStore; use futures::Future; use policy::{content::ContentPolicy, record::RecordPolicy}; @@ -19,6 +21,7 @@ use warg_crypto::signing::PrivateKey; pub mod api; pub mod args; +pub mod contentstore; pub mod datastore; pub mod policy; pub mod services; @@ -39,6 +42,7 @@ pub struct Config { checkpoint_interval: Option, content_policy: Option>, record_policy: Option>, + content_store: Arc, } impl std::fmt::Debug for Config { @@ -72,12 +76,13 @@ impl Config { operator_key, addr: None, data_store: None, - content_dir, + content_dir: content_dir.clone(), content_base_url: None, shutdown: None, checkpoint_interval: None, content_policy: None, record_policy: None, + content_store: Arc::new(LocalContentStore::new(content_dir.join("files"))), } } @@ -103,6 +108,14 @@ impl Config { self } + /// Specify the content store to use. + /// + /// If this is not specified, the server will use a local content store. + pub fn with_content_store(mut self, store: impl ContentStore + 'static) -> Self { + self.content_store = Arc::new(store); + self + } + /// Specify the data store to use via a boxed data store. /// /// If this is not specified, the server will use an in-memory data store. @@ -219,9 +232,9 @@ impl Server { content_base_url, core, temp_dir, - files_dir, self.config.content_policy, self.config.record_policy, + self.config.content_store, ); Ok(InitializedServer { diff --git a/tests/server.rs b/tests/server.rs index 60d029aa..47c7713f 100644 --- a/tests/server.rs +++ b/tests/server.rs @@ -421,8 +421,8 @@ async fn test_custom_content_url(config: &Config) -> Result<()> { .await?; let expected_url = format!( - "https://example.com/content/{digest}", - digest = digest.to_string().replace(':', "-") + "https://example.com/v1/package/{log_id}/record/{record_id}/content/{digest}", + record_id = record.id, ); match record.state {