Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Change: Break Change api of config plugin, support async fn. #163

Merged
merged 1 commit into from
May 18, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
### 0.3.0
- Refactor: tonic instead of tikv/grpc-rs
- Change: Break Change api of auth plugin, support async fn
- Change: Break Change api of config-filter plugin, support async fn
- Change: Break Change api of config-encryption plugin, support async fn
- TODO

### 0.2.6
Expand Down
22 changes: 16 additions & 6 deletions src/api/plugin/config_filter.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,15 @@
/// ConfigFilter
#[async_trait::async_trait]
pub trait ConfigFilter: Send + Sync {
/// Filter the config_req or config_resp. You can modify their values as needed.
///
/// [`ConfigReq`] and [`ConfigResp`] will not be [`None`] at the same time.
/// Only one of [`ConfigReq`] and [`ConfigResp`] is [`Some`].
fn filter(&self, config_req: Option<&mut ConfigReq>, config_resp: Option<&mut ConfigResp>);
async fn filter(
&self,
config_req: Option<&mut ConfigReq>,
config_resp: Option<&mut ConfigResp>,
);
}

/// ConfigReq for [`ConfigFilter`]
Expand Down Expand Up @@ -88,8 +93,13 @@ mod tests {
}
}

#[async_trait::async_trait]
impl ConfigFilter for TestConfigEncryptionFilter {
fn filter(&self, config_req: Option<&mut ConfigReq>, config_resp: Option<&mut ConfigResp>) {
async fn filter(
&self,
config_req: Option<&mut ConfigReq>,
config_resp: Option<&mut ConfigResp>,
) {
if let Some(config_req) = config_req {
if !config_req.encrypted_data_key.is_empty() {
config_req.content =
Expand All @@ -106,8 +116,8 @@ mod tests {
}
}

#[test]
fn test_config_filter() {
#[tokio::test]
async fn test_config_filter() {
let config_filter = TestConfigEncryptionFilter {};

let (data_id, group, namespace, content, encrypted_data_key) = (
Expand All @@ -125,7 +135,7 @@ mod tests {
content.clone(),
encrypted_data_key.clone(),
);
config_filter.filter(Some(&mut config_req), None);
config_filter.filter(Some(&mut config_req), None).await;

assert_eq!(config_req.content, encrypted_data_key + content.as_str());

Expand All @@ -136,7 +146,7 @@ mod tests {
config_req.content.clone(),
config_req.encrypted_data_key.clone(),
);
config_filter.filter(None, Some(&mut config_resp));
config_filter.filter(None, Some(&mut config_resp)).await;

assert_eq!(config_resp.content, content);
}
Expand Down
79 changes: 50 additions & 29 deletions src/api/plugin/encryption.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ pub const DEFAULT_CIPHER_PREFIX: &str = "cipher-";
pub const DEFAULT_CIPHER_SPLIT: &str = "-";

/// EncryptionPlugin for Config.
#[async_trait::async_trait]
pub trait EncryptionPlugin: Send + Sync {
/**
* Whether need to do cipher.
Expand Down Expand Up @@ -42,7 +43,7 @@ pub trait EncryptionPlugin: Send + Sync {
* @param content content unencrypted
* @return encrypt value
*/
fn encrypt(&self, secret_key: &str, content: &str) -> String;
async fn encrypt(&self, secret_key: &str, content: &str) -> String;

/**
* Decrypt interface.
Expand All @@ -51,14 +52,14 @@ pub trait EncryptionPlugin: Send + Sync {
* @param content encrypted
* @return decrypt value
*/
fn decrypt(&self, secret_key: &str, content: &str) -> String;
async fn decrypt(&self, secret_key: &str, content: &str) -> String;

/**
* Generate secret key. It only be known by you.
*
* @return Secret key
*/
fn generate_secret_key(&self) -> String;
async fn generate_secret_key(&self) -> String;

/**
* Algorithm name. e.g. AES,AES128,AES256,DES,3DES,...
Expand All @@ -73,15 +74,15 @@ pub trait EncryptionPlugin: Send + Sync {
* @param secretKey secretKey
* @return encrypted secretKey
*/
fn encrypt_secret_key(&self, secret_key: &str) -> String;
async fn encrypt_secret_key(&self, secret_key: &str) -> String;

/**
* Decrypt secret Key.
*
* @param secret_key secretKey
* @return decrypted secretKey
*/
fn decrypt_secret_key(&self, secret_key: &str) -> String;
async fn decrypt_secret_key(&self, secret_key: &str) -> String;
}

/// ConfigEncryptionFilter handle with [`EncryptionPlugin`]
Expand All @@ -95,18 +96,23 @@ impl ConfigEncryptionFilter {
}
}

#[async_trait::async_trait]
impl ConfigFilter for ConfigEncryptionFilter {
fn filter(&self, config_req: Option<&mut ConfigReq>, config_resp: Option<&mut ConfigResp>) {
async fn filter(
&self,
config_req: Option<&mut ConfigReq>,
config_resp: Option<&mut ConfigResp>,
) {
// Publish configuration, encrypt
if let Some(config_req) = config_req {
for plugin in &self.encryption_plugins {
if !plugin.need_cipher(&config_req.data_id) {
continue;
}

let secret_key = plugin.generate_secret_key();
let encrypted_content = plugin.encrypt(&secret_key, &config_req.content);
let encrypted_secret_key = plugin.encrypt_secret_key(&secret_key);
let secret_key = plugin.generate_secret_key().await;
let encrypted_content = plugin.encrypt(&secret_key, &config_req.content).await;
let encrypted_secret_key = plugin.encrypt_secret_key(&secret_key).await;

// set encrypted data.
config_req.encrypted_data_key = encrypted_secret_key;
Expand All @@ -127,9 +133,11 @@ impl ConfigFilter for ConfigEncryptionFilter {
let encrypted_secret_key = &config_resp.encrypted_data_key;
let encrypted_content = &config_resp.content;

let decrypted_secret_key = plugin.decrypt_secret_key(encrypted_secret_key);
let decrypted_content =
plugin.decrypt(&decrypted_secret_key, encrypted_content);
let decrypted_secret_key =
plugin.decrypt_secret_key(encrypted_secret_key).await;
let decrypted_content = plugin
.decrypt(&decrypted_secret_key, encrypted_content)
.await;

// set decrypted data.
config_resp.content = decrypted_content;
Expand All @@ -148,34 +156,35 @@ mod tests {

struct TestEncryptionPlugin;

#[async_trait::async_trait]
impl EncryptionPlugin for TestEncryptionPlugin {
fn encrypt(&self, secret_key: &str, content: &str) -> String {
async fn encrypt(&self, secret_key: &str, content: &str) -> String {
secret_key.to_owned() + content
}

fn decrypt(&self, secret_key: &str, content: &str) -> String {
async fn decrypt(&self, secret_key: &str, content: &str) -> String {
content.replace(secret_key, "")
}

fn generate_secret_key(&self) -> String {
async fn generate_secret_key(&self) -> String {
"secret-key".to_string()
}

fn algorithm_name(&self) -> String {
"TEST".to_string()
}

fn encrypt_secret_key(&self, secret_key: &str) -> String {
async fn encrypt_secret_key(&self, secret_key: &str) -> String {
"crypt_".to_owned() + secret_key
}

fn decrypt_secret_key(&self, secret_key: &str) -> String {
async fn decrypt_secret_key(&self, secret_key: &str) -> String {
secret_key.replace("crypt_", "")
}
}

#[test]
fn test_config_encryption_filters_empty() {
#[tokio::test]
async fn test_config_encryption_filters_empty() {
let config_encryption_filter = ConfigEncryptionFilter::new(vec![]);

let (data_id, group, namespace, content, encrypted_data_key) = (
Expand All @@ -193,7 +202,9 @@ mod tests {
content.clone(),
encrypted_data_key.clone(),
);
config_encryption_filter.filter(Some(&mut config_req), None);
config_encryption_filter
.filter(Some(&mut config_req), None)
.await;

assert_eq!(config_req.content, encrypted_data_key + content.as_str());

Expand All @@ -204,13 +215,15 @@ mod tests {
config_req.content.clone(),
config_req.encrypted_data_key.clone(),
);
config_encryption_filter.filter(None, Some(&mut config_resp));
config_encryption_filter
.filter(None, Some(&mut config_resp))
.await;

assert_eq!(config_resp.content, content);
}

#[test]
fn test_config_encryption_filters() {
#[tokio::test]
async fn test_config_encryption_filters() {
let config_encryption_filter =
ConfigEncryptionFilter::new(vec![Box::new(TestEncryptionPlugin {})]);

Expand All @@ -229,7 +242,9 @@ mod tests {
content.clone(),
encrypted_data_key.clone(),
);
config_encryption_filter.filter(Some(&mut config_req), None);
config_encryption_filter
.filter(Some(&mut config_req), None)
.await;

let mut config_resp = ConfigResp::new(
config_req.data_id.clone(),
Expand All @@ -238,13 +253,15 @@ mod tests {
config_req.content.clone(),
config_req.encrypted_data_key.clone(),
);
config_encryption_filter.filter(None, Some(&mut config_resp));
config_encryption_filter
.filter(None, Some(&mut config_resp))
.await;

assert_eq!(config_resp.content, content);
}

#[test]
fn test_config_encryption_filters_not_need_cipher() {
#[tokio::test]
async fn test_config_encryption_filters_not_need_cipher() {
let config_encryption_filter =
ConfigEncryptionFilter::new(vec![Box::new(TestEncryptionPlugin {})]);

Expand All @@ -263,7 +280,9 @@ mod tests {
content.clone(),
encrypted_data_key.clone(),
);
config_encryption_filter.filter(Some(&mut config_req), None);
config_encryption_filter
.filter(Some(&mut config_req), None)
.await;

let mut config_resp = ConfigResp::new(
config_req.data_id.clone(),
Expand All @@ -272,7 +291,9 @@ mod tests {
config_req.content.clone(),
config_req.encrypted_data_key.clone(),
);
config_encryption_filter.filter(None, Some(&mut config_resp));
config_encryption_filter
.filter(None, Some(&mut config_resp))
.await;

assert_eq!(config_resp.content, content);
}
Expand Down
8 changes: 4 additions & 4 deletions src/config/cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ impl CacheData {
}

/// Notify listener. when last-md5 not equals the-newest-md5
pub fn notify_listener(&mut self) {
pub async fn notify_listener(&mut self) {
tracing::info!(
"notify_listener, dataId={},group={},namespace={},md5={}",
self.data_id,
Expand All @@ -88,7 +88,7 @@ impl CacheData {
self.md5
);

let config_resp = self.get_config_resp_after_filter();
let config_resp = self.get_config_resp_after_filter().await;

if let Ok(mut mutex) = self.listeners.lock() {
for listen_wrap in mutex.iter_mut() {
Expand All @@ -107,7 +107,7 @@ impl CacheData {
}

/// inner method, will invoke config_filter
fn get_config_resp_after_filter(&self) -> ConfigResponse {
async fn get_config_resp_after_filter(&self) -> ConfigResponse {
let mut conf_resp = ConfigResp::new(
self.data_id.clone(),
self.group.clone(),
Expand All @@ -116,7 +116,7 @@ impl CacheData {
self.encrypted_data_key.clone(),
);
for config_filter in self.config_filters.iter() {
config_filter.filter(None, Some(&mut conf_resp));
config_filter.filter(None, Some(&mut conf_resp)).await;
}

ConfigResponse::new(
Expand Down
Loading