From 6ea0bc6db0b07f2afcbcf0442811b96752daa54d Mon Sep 17 00:00:00 2001 From: my-vegetable-has-exploded Date: Sun, 15 Oct 2023 22:29:12 +0800 Subject: [PATCH 1/8] feat(service/Cloudflare kv):support cloudflare KV init cloudflare-kv implement builder. Implement read implement scan feat(service/Cloudflare kv):support cloudflare KV --- core/Cargo.toml | 3 +- core/src/services/cloudflare_kv/backend.rs | 444 +++++++++++++++++++++ core/src/services/cloudflare_kv/docs.md | 22 + core/src/services/cloudflare_kv/error.rs | 83 ++++ core/src/services/cloudflare_kv/mod.rs | 21 + core/src/services/mod.rs | 5 + core/src/types/scheme.rs | 4 + 7 files changed, 581 insertions(+), 1 deletion(-) create mode 100644 core/src/services/cloudflare_kv/backend.rs create mode 100644 core/src/services/cloudflare_kv/docs.md create mode 100644 core/src/services/cloudflare_kv/error.rs create mode 100644 core/src/services/cloudflare_kv/mod.rs diff --git a/core/Cargo.toml b/core/Cargo.toml index ba4c955c58e3..da99cd645c81 100644 --- a/core/Cargo.toml +++ b/core/Cargo.toml @@ -116,6 +116,7 @@ services-azdls = [ "reqsign?/reqwest_request", ] services-cacache = ["dep:cacache"] +services-cloudflare-kv = [] services-cos = [ "dep:reqsign", "reqsign?/services-tencent", @@ -230,9 +231,9 @@ foundationdb = { version = "0.8.0", features = [ futures = { version = "0.3", default-features = false, features = ["std"] } governor = { version = "0.5", optional = true, features = ["std"] } hdrs = { version = "0.3.0", optional = true, features = ["async_file"] } +hrana-client-proto = { version = "0.2.1", optional = true } http = "0.2.9" hyper = "0.14" -hrana-client-proto = { version = "0.2.1", optional = true } lazy-regex = { version = "3.0.1", optional = true } log = "0.4" madsim = { version = "0.2.21", optional = true } diff --git a/core/src/services/cloudflare_kv/backend.rs b/core/src/services/cloudflare_kv/backend.rs new file mode 100644 index 000000000000..a544fb86adb0 --- /dev/null +++ b/core/src/services/cloudflare_kv/backend.rs @@ -0,0 +1,444 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +#![allow(dead_code)] +use std::collections::HashMap; +use std::fmt::Debug; +use std::fmt::Formatter; + +use async_trait::async_trait; +use http::header; +use http::Request; +use http::Response; +use http::StatusCode; +use serde::Deserialize; + +use super::error::parse_error; +use crate::raw::adapters::kv; +use crate::raw::*; +use crate::ErrorKind; +use crate::*; + +#[doc = include_str!("docs.md")] +#[derive(Default)] +pub struct CloudflareKvBuilder { + /// The token used to authenticate with CloudFlare. + token: Option, + /// The account ID used to authenticate with CloudFlare. Used as URI path parameter. + account_id: Option, + /// The namespace ID. Used as URI path parameter. + namespace_id: Option, + + /// The HTTP client used to communicate with CloudFlare. + http_client: Option, + /// Root within this backend. + root: Option, +} + +impl Debug for CloudflareKvBuilder { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + f.debug_struct("CloudFlareKvBuilder") + .field("account_id", &self.account_id) + .field("namespace_id", &self.namespace_id) + .field("root", &self.root) + .finish() + } +} + +impl CloudflareKvBuilder { + /// Set the token used to authenticate with CloudFlare. + pub fn token(&mut self, token: &str) -> &mut Self { + if !token.is_empty() { + self.token = Some(token.to_string()) + } + self + } + + /// Set the account ID used to authenticate with CloudFlare. + pub fn account_id(&mut self, account_id: &str) -> &mut Self { + if !account_id.is_empty() { + self.account_id = Some(account_id.to_string()) + } + self + } + + /// Set the namespace ID. + pub fn namespace_id(&mut self, namespace_id: &str) -> &mut Self { + if !namespace_id.is_empty() { + self.namespace_id = Some(namespace_id.to_string()) + } + self + } + + /// Set the root within this backend. + pub fn root(&mut self, root: &str) -> &mut Self { + if !root.is_empty() { + self.root = Some(root.to_string()) + } + self + } +} + +impl Builder for CloudflareKvBuilder { + const SCHEME: Scheme = Scheme::CloudflareKv; + + type Accessor = CloudflareKvBackend; + + fn from_map(map: HashMap) -> Self { + let mut builder = Self::default(); + map.get("token").map(|v| builder.token(v)); + map.get("account_id").map(|v| builder.account_id(v)); + map.get("namespace_id").map(|v| builder.namespace_id(v)); + map.get("root").map(|v| builder.root(v)); + builder + } + + fn build(&mut self) -> Result { + let mut authorization = None; + if let Some(token) = &self.token { + authorization = Some(format_authorization_by_bearer(token)?) + } + + let Some(account_id) = self.account_id.clone() else { + return Err(Error::new( + ErrorKind::ConfigInvalid, + "account_id is required", + )); + }; + + let Some(namespace_id) = self.namespace_id.clone() else { + return Err(Error::new( + ErrorKind::ConfigInvalid, + "namespace_id is required", + )); + }; + + let client = if let Some(client) = self.http_client.take() { + client + } else { + HttpClient::new().map_err(|err| { + err.with_operation("Builder::build") + .with_context("service", Scheme::CloudflareKv) + })? + }; + + let root = normalize_root( + self.root + .clone() + .unwrap_or_else(|| "/".to_string()) + .as_str(), + ); + + Ok(kv::Backend::new(Adapter { + authorization, + account_id, + namespace_id, + client, + }) + .with_root(&root)) + } +} + +pub type CloudflareKvBackend = kv::Backend; + +#[derive(Clone)] +pub struct Adapter { + authorization: Option, + account_id: String, + namespace_id: String, + client: HttpClient, +} + +impl Debug for Adapter { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + f.debug_struct("Adapter") + .field("account_id", &self.account_id) + .field("namespace_id", &self.namespace_id) + .finish() + } +} + +impl Adapter { + fn load_token(&self, mut req: Request) -> Result> { + if let Some(auth) = &self.authorization { + req.headers_mut() + .insert(header::AUTHORIZATION, auth.parse().unwrap()); + } + Ok(req) + } + + #[inline] + pub async fn send(&self, req: Request) -> Result> { + self.client.send(req).await + } +} + +#[async_trait] +impl kv::Adapter for Adapter { + fn metadata(&self) -> kv::Metadata { + kv::Metadata::new( + Scheme::CloudflareKv, + &self.namespace_id, + Capability { + read: true, + write: true, + list: true, + + ..Default::default() + }, + ) + } + + async fn get(&self, path: &str) -> Result>> { + let url = format!( + r"https://api.cloudflare.com/client/v4/accounts/{}/storage/kv/namespaces/{}", + self.account_id, self.namespace_id + ); + let url = format!("{}/values/{}", url, path); + let mut req = Request::get(&url); + req = req.header(header::CONTENT_TYPE, "application/json"); + let mut req = req + .body(AsyncBody::Empty) + .map_err(new_request_build_error)?; + req = self.load_token(req)?; + let resp = self.client.send(req).await?; + let status = resp.status(); + match status { + StatusCode::OK => { + let body = resp.into_body().bytes().await?; + Ok(Some(body.into())) + } + _ => Err(parse_error(resp).await?), + } + } + + async fn set(&self, path: &str, value: &[u8]) -> Result<()> { + let url = format!( + r"https://api.cloudflare.com/client/v4/accounts/{}/storage/kv/namespaces/{}", + self.account_id, self.namespace_id + ); + let url = format!("{}/values/{}", url, path); + let mut req = Request::put(&url); + req = req.header(header::CONTENT_TYPE, "multipart/form-data"); + let multipart = Multipart::new(); + let multipart = multipart + .part(FormDataPart::new("metadata").content(serde_json::Value::Null.to_string())) + .part(FormDataPart::new("value").content(value.to_vec())); + let mut req = multipart.apply(req)?; + req = self.load_token(req)?; + let resp = self.client.send(req).await?; + let status = resp.status(); + match status { + StatusCode::OK => { + let body = resp.into_body().bytes().await?; + let response: CfKvResponse = serde_json::from_slice(&body).map_err(|e| { + Error::new( + crate::ErrorKind::Unexpected, + &format!("failed to parse error response: {}", e), + ) + })?; + if !response.success { + return Err(Error::new( + crate::ErrorKind::Unexpected, + &String::from_utf8_lossy(&body), + )); + } + Ok(()) + } + _ => Err(parse_error(resp).await?), + } + } + + async fn delete(&self, path: &str) -> Result<()> { + let url = format!( + r"https://api.cloudflare.com/client/v4/accounts/{}/storage/kv/namespaces/{}", + self.account_id, self.namespace_id + ); + let url = format!("{}/values/{}", url, path); + let mut req = Request::delete(&url); + req = req.header(header::CONTENT_TYPE, "application/json"); + let mut req = req + .body(AsyncBody::Empty) + .map_err(new_request_build_error)?; + req = self.load_token(req)?; + let resp = self.client.send(req).await?; + let status = resp.status(); + match status { + StatusCode::OK => { + let body = resp.into_body().bytes().await?; + let response: CfKvResponse = serde_json::from_slice(&body).map_err(|e| { + Error::new( + crate::ErrorKind::Unexpected, + &format!("failed to parse error response: {}", e), + ) + })?; + if !response.success { + return Err(Error::new( + crate::ErrorKind::Unexpected, + &String::from_utf8_lossy(&body), + )); + } + Ok(()) + } + _ => Err(parse_error(resp).await?), + } + } + + async fn scan(&self, path: &str) -> Result> { + let url = format!( + r"https://api.cloudflare.com/client/v4/accounts/{}/storage/kv/namespaces/{}", + self.account_id, self.namespace_id + ); + let mut url = format!("{}/keys", url); + if path.len() > 0 { + url = format!("{}?prefix={}", url, path); + } + let mut req = Request::get(&url); + req = req.header(header::CONTENT_TYPE, "application/json"); + let mut req = req + .body(AsyncBody::Empty) + .map_err(new_request_build_error)?; + req = self.load_token(req)?; + let resp = self.client.send(req).await?; + let status = resp.status(); + match status { + StatusCode::OK => { + let body = resp.into_body().bytes().await?; + let response: CfKvScanResponse = serde_json::from_slice(&body).map_err(|e| { + Error::new( + crate::ErrorKind::Unexpected, + &format!("failed to parse error response: {}", e), + ) + })?; + if !response.success { + return Err(Error::new( + crate::ErrorKind::Unexpected, + &String::from_utf8_lossy(&body), + )); + } + Ok(response.result.into_iter().map(|r| r.name).collect()) + } + _ => Err(parse_error(resp).await?), + } + } +} + +#[derive(Debug, Deserialize)] +pub(crate) struct CfKvResponse { + pub(crate) errors: Vec, + messages: Vec, + result: serde_json::Value, + success: bool, +} + +#[derive(Debug, Deserialize)] +struct CfKvScanResponse { + errors: Vec, + messages: Vec, + result: Vec, + success: bool, + result_info: Option, +} + +#[derive(Debug, Deserialize)] +struct CfKvScanResult { + expiration: i64, + name: String, + metadata: serde_json::Value, +} + +#[derive(Debug, Deserialize)] +struct CfKvResultInfo { + count: i64, + cursor: String, +} + +#[derive(Debug, Deserialize)] +pub struct CfKvError { + pub message: String, + pub code: i32, +} + +#[derive(Debug, Deserialize)] +pub struct CfKvMessage { + pub message: String, + pub code: i32, +} + +#[cfg(test)] +mod test { + use super::*; + + #[test] + fn test_deserialize_scan_json_response() { + let json_str = r#"{ + "errors": [], + "messages": [], + "result": [ + { + "expiration": 1577836800, + "metadata": { + "someMetadataKey": "someMetadataValue" + }, + "name": "My-Key" + } + ], + "success": true, + "result_info": { + "count": 1, + "cursor": "6Ck1la0VxJ0djhidm1MdX2FyDGxLKVeeHZZmORS_8XeSuhz9SjIJRaSa2lnsF01tQOHrfTGAP3R5X1Kv5iVUuMbNKhWNAXHOl6ePB0TUL8nw" + } + }"#; + + let response: CfKvScanResponse = serde_json::from_slice(json_str.as_bytes()).unwrap(); + + assert_eq!(response.errors.len(), 0); + assert_eq!(response.messages.len(), 0); + assert_eq!(response.result.len(), 1); + assert_eq!(response.result[0].expiration, 1577836800); + assert_eq!(response.result[0].name, "My-Key"); + assert_eq!( + response.result[0].metadata, + serde_json::json!({"someMetadataKey": "someMetadataValue"}) + ); + assert_eq!(response.success, true); + assert!(response.result_info.is_some()); + match response.result_info { + Some(result_info) => { + assert_eq!(result_info.count, 1); + assert_eq!(result_info.cursor, "6Ck1la0VxJ0djhidm1MdX2FyDGxLKVeeHZZmORS_8XeSuhz9SjIJRaSa2lnsF01tQOHrfTGAP3R5X1Kv5iVUuMbNKhWNAXHOl6ePB0TUL8nw"); + } + None => {} + } + } + + #[test] + fn test_deserialize_json_response() { + let json_str = r#"{ + "errors": [], + "messages": [], + "result": {}, + "success": true + }"#; + + let response: CfKvResponse = serde_json::from_slice(json_str.as_bytes()).unwrap(); + + assert_eq!(response.errors.len(), 0); + assert_eq!(response.messages.len(), 0); + assert_eq!(response.result, serde_json::json!({})); + assert_eq!(response.success, true); + } +} diff --git a/core/src/services/cloudflare_kv/docs.md b/core/src/services/cloudflare_kv/docs.md new file mode 100644 index 000000000000..06c6006359ee --- /dev/null +++ b/core/src/services/cloudflare_kv/docs.md @@ -0,0 +1,22 @@ +## Capabilities + +This service can be used to: + +- [x] stat +- [x] read +- [x] write +- [x] create_dir +- [x] delete +- [ ] copy +- [ ] rename +- [ ] ~~list~~ +- [x] scan +- [ ] ~~presign~~ +- [ ] blocking + +## Configuration + +- `root`: Set the working directory of `OpenDAL` +- `token`: Set the token of cloudflare api +- `account_id`: Set the account identifier of cloudflare +- `namespace_id`: Set the namespace identifier of d1 diff --git a/core/src/services/cloudflare_kv/error.rs b/core/src/services/cloudflare_kv/error.rs new file mode 100644 index 000000000000..9b2c2ff941eb --- /dev/null +++ b/core/src/services/cloudflare_kv/error.rs @@ -0,0 +1,83 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use bytes::Buf; +use http::Response; +use http::StatusCode; + +use crate::raw::*; +use crate::Error; +use crate::ErrorKind; +use crate::Result; + +use serde_json::de; + +use super::backend::CfKvError; +use super::backend::CfKvResponse; + +/// Parse error response into Error. +pub async fn parse_error(resp: Response) -> Result { + let (parts, body) = resp.into_parts(); + let bs = body.bytes().await?; + + let (mut kind, mut retryable) = match parts.status { + StatusCode::NOT_FOUND => (ErrorKind::NotFound, false), + // Some services (like owncloud) return 403 while file locked. + StatusCode::FORBIDDEN => (ErrorKind::PermissionDenied, true), + // Allowing retry for resource locked. + StatusCode::LOCKED => (ErrorKind::Unexpected, true), + StatusCode::INTERNAL_SERVER_ERROR + | StatusCode::BAD_GATEWAY + | StatusCode::SERVICE_UNAVAILABLE + | StatusCode::GATEWAY_TIMEOUT => (ErrorKind::Unexpected, true), + _ => (ErrorKind::Unexpected, false), + }; + + let (message, err) = de::from_reader::<_, CfKvResponse>(bs.clone().reader()) + .map(|err| (format!("{err:?}"), Some(err))) + .unwrap_or_else(|_| (String::from_utf8_lossy(&bs).into_owned(), None)); + + if let Some(err) = err { + (kind, retryable) = parse_cfkv_error_code(err.errors).unwrap_or((kind, retryable)); + } + + let mut err = Error::new(kind, &message); + + err = with_error_response_context(err, parts); + + if retryable { + err = err.set_temporary(); + } + + Ok(err) +} + +pub fn parse_cfkv_error_code(errors: Vec) -> Option<(ErrorKind, bool)> { + if errors.is_empty() { + return None; + } + + match errors[0].code { + // The request is malformed: failed to decode id. + 7400 => Some((ErrorKind::Unexpected, false)), + // no such column: Xxxx. + 7500 => Some((ErrorKind::NotFound, false)), + // Authentication error. + 10000 => Some((ErrorKind::PermissionDenied, false)), + _ => None, + } +} diff --git a/core/src/services/cloudflare_kv/mod.rs b/core/src/services/cloudflare_kv/mod.rs new file mode 100644 index 000000000000..fa09aa3d0024 --- /dev/null +++ b/core/src/services/cloudflare_kv/mod.rs @@ -0,0 +1,21 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +mod backend; +mod error; + +pub use backend::CloudflareKvBuilder as CloudflareKv; diff --git a/core/src/services/mod.rs b/core/src/services/mod.rs index d69ea5b0728f..ffd027ac8c69 100644 --- a/core/src/services/mod.rs +++ b/core/src/services/mod.rs @@ -29,6 +29,11 @@ mod azdls; #[cfg(feature = "services-azdls")] pub use azdls::Azdls; +#[cfg(feature = "services-cloudflare-kv")] +mod cloudflare_kv; +#[cfg(feature = "services-cloudflare-kv")] +pub use self::cloudflare_kv::CloudflareKv; + #[cfg(feature = "services-cos")] mod cos; #[cfg(feature = "services-cos")] diff --git a/core/src/types/scheme.rs b/core/src/types/scheme.rs index e9b16164055a..b2dc3d5917ab 100644 --- a/core/src/types/scheme.rs +++ b/core/src/types/scheme.rs @@ -40,6 +40,8 @@ pub enum Scheme { Azdls, /// [cacache][crate::services::Cacache]: cacache backend support. Cacache, + /// [cloudflare-kv][crate::services::CloudflareKv]: Cloudflare KV services. + CloudflareKv, /// [cos][crate::services::Cos]: Tencent Cloud Object Storage services. Cos, /// [d1][crate::services::D1]: D1 services @@ -267,6 +269,7 @@ impl FromStr for Scheme { // And abfs is widely used in hadoop ecosystem, keep it for easy to use. "azdls" | "azdfs" | "abfs" => Ok(Scheme::Azdls), "cacache" => Ok(Scheme::Cacache), + "cloudflare_kv" => Ok(Scheme::CloudflareKv), "cos" => Ok(Scheme::Cos), "d1" => Ok(Scheme::D1), "dashmap" => Ok(Scheme::Dashmap), @@ -318,6 +321,7 @@ impl From for &'static str { Scheme::Azblob => "azblob", Scheme::Azdls => "azdls", Scheme::Cacache => "cacache", + Scheme::CloudflareKv => "cloudflare_kv", Scheme::Cos => "cos", Scheme::D1 => "d1", Scheme::Dashmap => "dashmap", From ce42c98250716366ce7ceef866a93e9462a4e6bb Mon Sep 17 00:00:00 2001 From: my-vegetable-has-exploded Date: Thu, 19 Oct 2023 22:34:48 +0800 Subject: [PATCH 2/8] fix: fmt --- core/src/types/scheme.rs | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/core/src/types/scheme.rs b/core/src/types/scheme.rs index b2dc3d5917ab..d5038b57cfc3 100644 --- a/core/src/types/scheme.rs +++ b/core/src/types/scheme.rs @@ -40,8 +40,8 @@ pub enum Scheme { Azdls, /// [cacache][crate::services::Cacache]: cacache backend support. Cacache, - /// [cloudflare-kv][crate::services::CloudflareKv]: Cloudflare KV services. - CloudflareKv, + /// [cloudflare-kv][crate::services::CloudflareKv]: Cloudflare KV services. + CloudflareKv, /// [cos][crate::services::Cos]: Tencent Cloud Object Storage services. Cos, /// [d1][crate::services::D1]: D1 services @@ -269,7 +269,7 @@ impl FromStr for Scheme { // And abfs is widely used in hadoop ecosystem, keep it for easy to use. "azdls" | "azdfs" | "abfs" => Ok(Scheme::Azdls), "cacache" => Ok(Scheme::Cacache), - "cloudflare_kv" => Ok(Scheme::CloudflareKv), + "cloudflare_kv" => Ok(Scheme::CloudflareKv), "cos" => Ok(Scheme::Cos), "d1" => Ok(Scheme::D1), "dashmap" => Ok(Scheme::Dashmap), @@ -321,7 +321,7 @@ impl From for &'static str { Scheme::Azblob => "azblob", Scheme::Azdls => "azdls", Scheme::Cacache => "cacache", - Scheme::CloudflareKv => "cloudflare_kv", + Scheme::CloudflareKv => "cloudflare_kv", Scheme::Cos => "cos", Scheme::D1 => "d1", Scheme::Dashmap => "dashmap", From a031aafc9db6bf1001fb3511dc39f737df53fc91 Mon Sep 17 00:00:00 2001 From: my-vegetable-has-exploded Date: Sat, 21 Oct 2023 20:52:12 +0800 Subject: [PATCH 3/8] fix .. --- core/src/services/cloudflare_kv/backend.rs | 145 +++++++-------------- 1 file changed, 45 insertions(+), 100 deletions(-) diff --git a/core/src/services/cloudflare_kv/backend.rs b/core/src/services/cloudflare_kv/backend.rs index a544fb86adb0..5e74e96d5224 100644 --- a/core/src/services/cloudflare_kv/backend.rs +++ b/core/src/services/cloudflare_kv/backend.rs @@ -14,7 +14,6 @@ // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. -#![allow(dead_code)] use std::collections::HashMap; use std::fmt::Debug; use std::fmt::Formatter; @@ -22,7 +21,6 @@ use std::fmt::Formatter; use async_trait::async_trait; use http::header; use http::Request; -use http::Response; use http::StatusCode; use serde::Deserialize; @@ -107,10 +105,10 @@ impl Builder for CloudflareKvBuilder { } fn build(&mut self) -> Result { - let mut authorization = None; - if let Some(token) = &self.token { - authorization = Some(format_authorization_by_bearer(token)?) - } + let authorization = match &self.token { + Some(token) => format_authorization_by_bearer(token)?, + None => return Err(Error::new(ErrorKind::ConfigInvalid, "token is required")), + }; let Some(account_id) = self.account_id.clone() else { return Err(Error::new( @@ -142,11 +140,17 @@ impl Builder for CloudflareKvBuilder { .as_str(), ); + let url_prefix = format!( + r"https://api.cloudflare.com/client/v4/accounts/{}/storage/kv/namespaces/{}", + account_id, namespace_id + ); + Ok(kv::Backend::new(Adapter { authorization, account_id, namespace_id, client, + url_prefix, }) .with_root(&root)) } @@ -156,10 +160,11 @@ pub type CloudflareKvBackend = kv::Backend; #[derive(Clone)] pub struct Adapter { - authorization: Option, + authorization: String, account_id: String, namespace_id: String, client: HttpClient, + url_prefix: String, } impl Debug for Adapter { @@ -172,18 +177,11 @@ impl Debug for Adapter { } impl Adapter { - fn load_token(&self, mut req: Request) -> Result> { - if let Some(auth) = &self.authorization { - req.headers_mut() - .insert(header::AUTHORIZATION, auth.parse().unwrap()); - } + fn sign(&self, mut req: Request) -> Result> { + req.headers_mut() + .insert(header::AUTHORIZATION, self.authorization.parse().unwrap()); Ok(req) } - - #[inline] - pub async fn send(&self, req: Request) -> Result> { - self.client.send(req).await - } } #[async_trait] @@ -203,17 +201,13 @@ impl kv::Adapter for Adapter { } async fn get(&self, path: &str) -> Result>> { - let url = format!( - r"https://api.cloudflare.com/client/v4/accounts/{}/storage/kv/namespaces/{}", - self.account_id, self.namespace_id - ); - let url = format!("{}/values/{}", url, path); + let url = format!("{}/values/{}", self.url_prefix, path); let mut req = Request::get(&url); req = req.header(header::CONTENT_TYPE, "application/json"); let mut req = req .body(AsyncBody::Empty) .map_err(new_request_build_error)?; - req = self.load_token(req)?; + req = self.sign(req)?; let resp = self.client.send(req).await?; let status = resp.status(); match status { @@ -226,84 +220,41 @@ impl kv::Adapter for Adapter { } async fn set(&self, path: &str, value: &[u8]) -> Result<()> { - let url = format!( - r"https://api.cloudflare.com/client/v4/accounts/{}/storage/kv/namespaces/{}", - self.account_id, self.namespace_id - ); - let url = format!("{}/values/{}", url, path); - let mut req = Request::put(&url); - req = req.header(header::CONTENT_TYPE, "multipart/form-data"); + let url = format!("{}/values/{}", self.url_prefix, path); + let req = Request::put(&url); let multipart = Multipart::new(); let multipart = multipart .part(FormDataPart::new("metadata").content(serde_json::Value::Null.to_string())) .part(FormDataPart::new("value").content(value.to_vec())); let mut req = multipart.apply(req)?; - req = self.load_token(req)?; + req = self.sign(req)?; let resp = self.client.send(req).await?; let status = resp.status(); match status { - StatusCode::OK => { - let body = resp.into_body().bytes().await?; - let response: CfKvResponse = serde_json::from_slice(&body).map_err(|e| { - Error::new( - crate::ErrorKind::Unexpected, - &format!("failed to parse error response: {}", e), - ) - })?; - if !response.success { - return Err(Error::new( - crate::ErrorKind::Unexpected, - &String::from_utf8_lossy(&body), - )); - } - Ok(()) - } + StatusCode::OK => Ok(()), _ => Err(parse_error(resp).await?), } } async fn delete(&self, path: &str) -> Result<()> { - let url = format!( - r"https://api.cloudflare.com/client/v4/accounts/{}/storage/kv/namespaces/{}", - self.account_id, self.namespace_id - ); - let url = format!("{}/values/{}", url, path); + let url = format!("{}/values/{}", self.url_prefix, path); let mut req = Request::delete(&url); req = req.header(header::CONTENT_TYPE, "application/json"); let mut req = req .body(AsyncBody::Empty) .map_err(new_request_build_error)?; - req = self.load_token(req)?; + req = self.sign(req)?; let resp = self.client.send(req).await?; let status = resp.status(); match status { - StatusCode::OK => { - let body = resp.into_body().bytes().await?; - let response: CfKvResponse = serde_json::from_slice(&body).map_err(|e| { - Error::new( - crate::ErrorKind::Unexpected, - &format!("failed to parse error response: {}", e), - ) - })?; - if !response.success { - return Err(Error::new( - crate::ErrorKind::Unexpected, - &String::from_utf8_lossy(&body), - )); - } - Ok(()) - } + StatusCode::OK => Ok(()), _ => Err(parse_error(resp).await?), } } async fn scan(&self, path: &str) -> Result> { - let url = format!( - r"https://api.cloudflare.com/client/v4/accounts/{}/storage/kv/namespaces/{}", - self.account_id, self.namespace_id - ); - let mut url = format!("{}/keys", url); - if path.len() > 0 { + let mut url = format!("{}/keys", self.url_prefix); + if !path.is_empty() { url = format!("{}?prefix={}", url, path); } let mut req = Request::get(&url); @@ -311,7 +262,7 @@ impl kv::Adapter for Adapter { let mut req = req .body(AsyncBody::Empty) .map_err(new_request_build_error)?; - req = self.load_token(req)?; + req = self.sign(req)?; let resp = self.client.send(req).await?; let status = resp.status(); match status { @@ -323,12 +274,6 @@ impl kv::Adapter for Adapter { &format!("failed to parse error response: {}", e), ) })?; - if !response.success { - return Err(Error::new( - crate::ErrorKind::Unexpected, - &String::from_utf8_lossy(&body), - )); - } Ok(response.result.into_iter().map(|r| r.name).collect()) } _ => Err(parse_error(resp).await?), @@ -337,33 +282,33 @@ impl kv::Adapter for Adapter { } #[derive(Debug, Deserialize)] -pub(crate) struct CfKvResponse { - pub(crate) errors: Vec, - messages: Vec, - result: serde_json::Value, - success: bool, +pub struct CfKvResponse { + pub errors: Vec, + pub messages: Vec, + pub result: serde_json::Value, + pub success: bool, } #[derive(Debug, Deserialize)] -struct CfKvScanResponse { - errors: Vec, - messages: Vec, - result: Vec, - success: bool, - result_info: Option, +pub struct CfKvScanResponse { + pub errors: Vec, + pub messages: Vec, + pub result: Vec, + pub success: bool, + pub result_info: Option, } #[derive(Debug, Deserialize)] -struct CfKvScanResult { - expiration: i64, - name: String, - metadata: serde_json::Value, +pub struct CfKvScanResult { + pub expiration: i64, + pub name: String, + pub metadata: serde_json::Value, } #[derive(Debug, Deserialize)] -struct CfKvResultInfo { - count: i64, - cursor: String, +pub struct CfKvResultInfo { + pub count: i64, + pub cursor: String, } #[derive(Debug, Deserialize)] From 7f6e89b3aa86c897b44864dbf5c76711271be8ac Mon Sep 17 00:00:00 2001 From: my-vegetable-has-exploded Date: Sun, 22 Oct 2023 19:29:01 +0800 Subject: [PATCH 4/8] fix clippy --- core/src/services/cloudflare_kv/backend.rs | 13 +++++-------- 1 file changed, 5 insertions(+), 8 deletions(-) diff --git a/core/src/services/cloudflare_kv/backend.rs b/core/src/services/cloudflare_kv/backend.rs index 5e74e96d5224..5a621c4eeedc 100644 --- a/core/src/services/cloudflare_kv/backend.rs +++ b/core/src/services/cloudflare_kv/backend.rs @@ -359,14 +359,11 @@ mod test { response.result[0].metadata, serde_json::json!({"someMetadataKey": "someMetadataValue"}) ); - assert_eq!(response.success, true); + assert!(response.success); assert!(response.result_info.is_some()); - match response.result_info { - Some(result_info) => { - assert_eq!(result_info.count, 1); - assert_eq!(result_info.cursor, "6Ck1la0VxJ0djhidm1MdX2FyDGxLKVeeHZZmORS_8XeSuhz9SjIJRaSa2lnsF01tQOHrfTGAP3R5X1Kv5iVUuMbNKhWNAXHOl6ePB0TUL8nw"); - } - None => {} + if let Some(result_info) = response.result_info { + assert_eq!(result_info.count, 1); + assert_eq!(result_info.cursor, "6Ck1la0VxJ0djhidm1MdX2FyDGxLKVeeHZZmORS_8XeSuhz9SjIJRaSa2lnsF01tQOHrfTGAP3R5X1Kv5iVUuMbNKhWNAXHOl6ePB0TUL8nw"); } } @@ -384,6 +381,6 @@ mod test { assert_eq!(response.errors.len(), 0); assert_eq!(response.messages.len(), 0); assert_eq!(response.result, serde_json::json!({})); - assert_eq!(response.success, true); + assert!(response.success); } } From 01586b7c1800698a56cde67bfbebcdd621625bcb Mon Sep 17 00:00:00 2001 From: my-vegetable-has-exploded Date: Sun, 22 Oct 2023 22:31:04 +0800 Subject: [PATCH 5/8] fix licenses --- core/src/services/cloudflare_kv/backend.rs | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/core/src/services/cloudflare_kv/backend.rs b/core/src/services/cloudflare_kv/backend.rs index 5a621c4eeedc..0cfc9ef936bb 100644 --- a/core/src/services/cloudflare_kv/backend.rs +++ b/core/src/services/cloudflare_kv/backend.rs @@ -14,6 +14,7 @@ // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. + use std::collections::HashMap; use std::fmt::Debug; use std::fmt::Formatter; @@ -292,7 +293,8 @@ pub struct CfKvResponse { #[derive(Debug, Deserialize)] pub struct CfKvScanResponse { pub errors: Vec, - pub messages: Vec, + #[allow(dead_code)] + messages: Vec, pub result: Vec, pub success: bool, pub result_info: Option, From c71606b42251ed8e04cd4d4ab77660bf9635a650 Mon Sep 17 00:00:00 2001 From: my-vegetable-has-exploded Date: Mon, 23 Oct 2023 11:28:26 +0800 Subject: [PATCH 6/8] fix: comment unused field --- core/src/services/cloudflare_kv/backend.rs | 89 +++++++++++----------- core/src/services/cloudflare_kv/error.rs | 4 +- 2 files changed, 46 insertions(+), 47 deletions(-) diff --git a/core/src/services/cloudflare_kv/backend.rs b/core/src/services/cloudflare_kv/backend.rs index 0cfc9ef936bb..0758bd91e824 100644 --- a/core/src/services/cloudflare_kv/backend.rs +++ b/core/src/services/cloudflare_kv/backend.rs @@ -283,47 +283,46 @@ impl kv::Adapter for Adapter { } #[derive(Debug, Deserialize)] -pub struct CfKvResponse { - pub errors: Vec, - pub messages: Vec, - pub result: serde_json::Value, - pub success: bool, +pub(crate) struct CfKvResponse { + pub(crate) errors: Vec, + // pub(crate) messages: Vec, + // pub(crate) result: serde_json::Value, + // pub(crate) success: bool, } #[derive(Debug, Deserialize)] -pub struct CfKvScanResponse { - pub errors: Vec, - #[allow(dead_code)] - messages: Vec, - pub result: Vec, - pub success: bool, - pub result_info: Option, +pub(crate) struct CfKvScanResponse { + // errors: Vec, + // messages: Vec, + result: Vec, + // success: bool, + // result_info: Option, } #[derive(Debug, Deserialize)] -pub struct CfKvScanResult { - pub expiration: i64, - pub name: String, - pub metadata: serde_json::Value, +struct CfKvScanResult { + // expiration: i64, + name: String, + // metadata: serde_json::Value, } -#[derive(Debug, Deserialize)] -pub struct CfKvResultInfo { - pub count: i64, - pub cursor: String, -} +// #[derive(Debug, Deserialize)] +// struct CfKvResultInfo { +// count: i64, +// cursor: String, +// } #[derive(Debug, Deserialize)] -pub struct CfKvError { - pub message: String, - pub code: i32, +pub(crate) struct CfKvError { + // pub(crate) message: String, + pub(crate) code: i32, } -#[derive(Debug, Deserialize)] -pub struct CfKvMessage { - pub message: String, - pub code: i32, -} +// #[derive(Debug, Deserialize)] +// pub(crate) struct CfKvMessage { +// pub(crate) message: String, +// pub(crate) code: i32, +// } #[cfg(test)] mod test { @@ -352,21 +351,21 @@ mod test { let response: CfKvScanResponse = serde_json::from_slice(json_str.as_bytes()).unwrap(); - assert_eq!(response.errors.len(), 0); - assert_eq!(response.messages.len(), 0); + // assert_eq!(response.errors.len(), 0); + // assert_eq!(response.messages.len(), 0); assert_eq!(response.result.len(), 1); - assert_eq!(response.result[0].expiration, 1577836800); + // assert_eq!(response.result[0].expiration, 1577836800); assert_eq!(response.result[0].name, "My-Key"); - assert_eq!( - response.result[0].metadata, - serde_json::json!({"someMetadataKey": "someMetadataValue"}) - ); - assert!(response.success); - assert!(response.result_info.is_some()); - if let Some(result_info) = response.result_info { - assert_eq!(result_info.count, 1); - assert_eq!(result_info.cursor, "6Ck1la0VxJ0djhidm1MdX2FyDGxLKVeeHZZmORS_8XeSuhz9SjIJRaSa2lnsF01tQOHrfTGAP3R5X1Kv5iVUuMbNKhWNAXHOl6ePB0TUL8nw"); - } + // assert_eq!( + // response.result[0].metadata, + // serde_json::json!({"someMetadataKey": "someMetadataValue"}) + // ); + // assert!(response.success); + // assert!(response.result_info.is_some()); + // if let Some(result_info) = response.result_info { + // assert_eq!(result_info.count, 1); + // assert_eq!(result_info.cursor, "6Ck1la0VxJ0djhidm1MdX2FyDGxLKVeeHZZmORS_8XeSuhz9SjIJRaSa2lnsF01tQOHrfTGAP3R5X1Kv5iVUuMbNKhWNAXHOl6ePB0TUL8nw"); + // } } #[test] @@ -381,8 +380,8 @@ mod test { let response: CfKvResponse = serde_json::from_slice(json_str.as_bytes()).unwrap(); assert_eq!(response.errors.len(), 0); - assert_eq!(response.messages.len(), 0); - assert_eq!(response.result, serde_json::json!({})); - assert!(response.success); + // assert_eq!(response.messages.len(), 0); + // assert_eq!(response.result, serde_json::json!({})); + // assert!(response.success); } } diff --git a/core/src/services/cloudflare_kv/error.rs b/core/src/services/cloudflare_kv/error.rs index 9b2c2ff941eb..10f4c4aaabf7 100644 --- a/core/src/services/cloudflare_kv/error.rs +++ b/core/src/services/cloudflare_kv/error.rs @@ -30,7 +30,7 @@ use super::backend::CfKvError; use super::backend::CfKvResponse; /// Parse error response into Error. -pub async fn parse_error(resp: Response) -> Result { +pub(crate) async fn parse_error(resp: Response) -> Result { let (parts, body) = resp.into_parts(); let bs = body.bytes().await?; @@ -66,7 +66,7 @@ pub async fn parse_error(resp: Response) -> Result { Ok(err) } -pub fn parse_cfkv_error_code(errors: Vec) -> Option<(ErrorKind, bool)> { +pub(crate) fn parse_cfkv_error_code(errors: Vec) -> Option<(ErrorKind, bool)> { if errors.is_empty() { return None; } From 4cc0698cb5249b9c68ae091eb3f6241c1f434f05 Mon Sep 17 00:00:00 2001 From: my-vegetable-has-exploded Date: Mon, 23 Oct 2023 16:28:26 +0800 Subject: [PATCH 7/8] fix: remove unused field --- core/src/services/cloudflare_kv/backend.rs | 27 +--------------------- 1 file changed, 1 insertion(+), 26 deletions(-) diff --git a/core/src/services/cloudflare_kv/backend.rs b/core/src/services/cloudflare_kv/backend.rs index 0758bd91e824..6981a6e9bb47 100644 --- a/core/src/services/cloudflare_kv/backend.rs +++ b/core/src/services/cloudflare_kv/backend.rs @@ -285,25 +285,18 @@ impl kv::Adapter for Adapter { #[derive(Debug, Deserialize)] pub(crate) struct CfKvResponse { pub(crate) errors: Vec, - // pub(crate) messages: Vec, - // pub(crate) result: serde_json::Value, - // pub(crate) success: bool, } #[derive(Debug, Deserialize)] pub(crate) struct CfKvScanResponse { - // errors: Vec, - // messages: Vec, result: Vec, - // success: bool, + // According to https://developers.cloudflare.com/api/operations/workers-kv-namespace-list-a-namespace'-s-keys, result_info is used to determine if there are more keys to be listed // result_info: Option, } #[derive(Debug, Deserialize)] struct CfKvScanResult { - // expiration: i64, name: String, - // metadata: serde_json::Value, } // #[derive(Debug, Deserialize)] @@ -314,16 +307,9 @@ struct CfKvScanResult { #[derive(Debug, Deserialize)] pub(crate) struct CfKvError { - // pub(crate) message: String, pub(crate) code: i32, } -// #[derive(Debug, Deserialize)] -// pub(crate) struct CfKvMessage { -// pub(crate) message: String, -// pub(crate) code: i32, -// } - #[cfg(test)] mod test { use super::*; @@ -351,16 +337,8 @@ mod test { let response: CfKvScanResponse = serde_json::from_slice(json_str.as_bytes()).unwrap(); - // assert_eq!(response.errors.len(), 0); - // assert_eq!(response.messages.len(), 0); assert_eq!(response.result.len(), 1); - // assert_eq!(response.result[0].expiration, 1577836800); assert_eq!(response.result[0].name, "My-Key"); - // assert_eq!( - // response.result[0].metadata, - // serde_json::json!({"someMetadataKey": "someMetadataValue"}) - // ); - // assert!(response.success); // assert!(response.result_info.is_some()); // if let Some(result_info) = response.result_info { // assert_eq!(result_info.count, 1); @@ -380,8 +358,5 @@ mod test { let response: CfKvResponse = serde_json::from_slice(json_str.as_bytes()).unwrap(); assert_eq!(response.errors.len(), 0); - // assert_eq!(response.messages.len(), 0); - // assert_eq!(response.result, serde_json::json!({})); - // assert!(response.success); } } From 05974166e8e97d869eb1a7712139f676126700be Mon Sep 17 00:00:00 2001 From: my-vegetable-has-exploded Date: Mon, 23 Oct 2023 16:41:05 +0800 Subject: [PATCH 8/8] fmt --- core/src/services/cloudflare_kv/backend.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/services/cloudflare_kv/backend.rs b/core/src/services/cloudflare_kv/backend.rs index 6981a6e9bb47..e05bb3931e22 100644 --- a/core/src/services/cloudflare_kv/backend.rs +++ b/core/src/services/cloudflare_kv/backend.rs @@ -290,7 +290,7 @@ pub(crate) struct CfKvResponse { #[derive(Debug, Deserialize)] pub(crate) struct CfKvScanResponse { result: Vec, - // According to https://developers.cloudflare.com/api/operations/workers-kv-namespace-list-a-namespace'-s-keys, result_info is used to determine if there are more keys to be listed + // According to https://developers.cloudflare.com/api/operations/workers-kv-namespace-list-a-namespace'-s-keys, result_info is used to determine if there are more keys to be listed // result_info: Option, }