Skip to content

Commit

Permalink
[ISSUE #74] 🚩 Namesrv support wipe write perm of boker(request code 205)
Browse files Browse the repository at this point in the history
  • Loading branch information
mxsm committed Jan 21, 2024
1 parent 4c966f4 commit 0a8dd6f
Show file tree
Hide file tree
Showing 5 changed files with 141 additions and 1 deletion.
17 changes: 17 additions & 0 deletions rocketmq-namesrv/src/processor/default_request_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,9 @@ use rocketmq_remoting::{
DeleteKVConfigRequestHeader, GetKVConfigRequestHeader, GetKVConfigResponseHeader,
PutKVConfigRequestHeader,
},
perm_broker_header::{
WipeWritePermOfBrokerRequestHeader, WipeWritePermOfBrokerResponseHeader,
},
query_data_version_header::{
QueryDataVersionRequestHeader, QueryDataVersionResponseHeader,
},
Expand Down Expand Up @@ -77,6 +80,7 @@ impl RequestProcessor for DefaultRequestProcessor {
Some(RequestCode::GetBrokerMemberGroup) => self.get_broker_member_group(request),
//handle get broker cluster info
Some(RequestCode::GetBrokerClusterInfo) => self.get_broker_cluster_info(request),
Some(RequestCode::WipeWritePermOfBroker) => self.wipe_write_perm_of_broker(request),

_ => RemotingCommand::create_response_command_with_code(
RemotingSysResponseCode::SystemError,
Expand Down Expand Up @@ -316,6 +320,19 @@ impl DefaultRequestProcessor {
RemotingCommand::create_response_command_with_code(RemotingSysResponseCode::Success)
.set_body(Some(Bytes::from(vec)))
}

fn wipe_write_perm_of_broker(&mut self, request: RemotingCommand) -> RemotingCommand {
let request_header = request
.decode_command_custom_header::<WipeWritePermOfBrokerRequestHeader>()
.unwrap();
let wipe_topic_cnt = self
.route_info_manager
.write()
.wipe_write_perm_of_broker_by_lock(request_header.broker_name.as_str());
RemotingCommand::create_response_command().set_command_custom_header(Some(Box::new(
WipeWritePermOfBrokerResponseHeader::new(wipe_topic_cnt),
)))
}
}

fn extract_register_topic_config_from_request(
Expand Down
28 changes: 28 additions & 0 deletions rocketmq-namesrv/src/route/route_info_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -643,4 +643,32 @@ impl RouteInfoManager {
}
None
}

pub(crate) fn wipe_write_perm_of_broker_by_lock(&mut self, broker_name: &str) -> i32 {
self.operate_write_perm_of_broker(broker_name, RequestCode::WipeWritePermOfBroker)
}

fn operate_write_perm_of_broker(
&mut self,
broker_name: &str,
request_code: RequestCode,
) -> i32 {
let mut topic_cnt = 0;
for (_topic, qd_map) in self.topic_queue_table.iter_mut() {
let qd = qd_map.get_mut(broker_name).unwrap();
let mut perm = qd.perm;
match request_code {
RequestCode::WipeWritePermOfBroker => {
perm &= !PermName::PERM_WRITE as u32;
}
RequestCode::AddWritePermOfBroker => {
perm = (PermName::PERM_READ | PermName::PERM_WRITE) as u32;
}
_ => {}
}
qd.perm = perm;
topic_cnt += 1;
}
topic_cnt
}
}
1 change: 1 addition & 0 deletions rocketmq-remoting/src/protocol/header/namesrv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,5 +18,6 @@
pub mod broker_request;
pub mod brokerid_change_request_header;
pub mod kv_config_header;
pub mod perm_broker_header;
pub mod query_data_version_header;
pub mod register_broker_header;
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
use std::collections::HashMap;

use serde::{Deserialize, Serialize};

use crate::protocol::command_custom_header::{CommandCustomHeader, FromMap};

#[derive(Debug, Clone, Deserialize, Serialize, Default)]
#[serde(rename_all = "camelCase")]
pub struct WipeWritePermOfBrokerRequestHeader {
pub broker_name: String,
}

impl WipeWritePermOfBrokerRequestHeader {
const BROKER_NAME: &'static str = "brokerName";
pub fn new(broker_name: impl Into<String>) -> Self {
Self {
broker_name: broker_name.into(),
}
}
}

impl CommandCustomHeader for WipeWritePermOfBrokerRequestHeader {
fn to_map(&self) -> Option<HashMap<String, String>> {
Some(HashMap::from([(
Self::BROKER_NAME.to_string(),
self.broker_name.clone(),
)]))
}
}

impl FromMap for WipeWritePermOfBrokerRequestHeader {
type Target = Self;

fn from(map: &HashMap<String, String>) -> Option<Self::Target> {
Some(WipeWritePermOfBrokerRequestHeader {
broker_name: map
.get(WipeWritePermOfBrokerRequestHeader::BROKER_NAME)
.cloned()
.unwrap_or_default(),
})
}
}

#[derive(Debug, Clone, Deserialize, Serialize, Default)]
#[serde(rename_all = "camelCase")]
pub struct WipeWritePermOfBrokerResponseHeader {
pub wipe_topic_count: i32,
}

impl WipeWritePermOfBrokerResponseHeader {
const WIPE_TOPIC_COUNT: &'static str = "wipeTopicCount";

pub fn new(wipe_topic_count: i32) -> Self {
Self { wipe_topic_count }
}
}

impl CommandCustomHeader for WipeWritePermOfBrokerResponseHeader {
fn to_map(&self) -> Option<HashMap<String, String>> {
Some(HashMap::from([(
Self::WIPE_TOPIC_COUNT.to_string(),
self.wipe_topic_count.to_string(),
)]))
}
}

impl FromMap for WipeWritePermOfBrokerResponseHeader {
type Target = Self;

fn from(map: &HashMap<String, String>) -> Option<Self::Target> {
Some(WipeWritePermOfBrokerResponseHeader {
wipe_topic_count: map
.get(WipeWritePermOfBrokerResponseHeader::WIPE_TOPIC_COUNT)
.and_then(|s| s.parse::<i32>().ok())
.unwrap_or(0),
})
}
}
2 changes: 1 addition & 1 deletion rocketmq-remoting/src/protocol/route/route_data_view.rs
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ pub struct QueueData {
read_queue_nums: u32,
#[serde(rename = "writeQueueNums")]
write_queue_nums: u32,
perm: u32,
pub perm: u32,
#[serde(rename = "topicSysFlag")]
topic_sys_flag: u32,
}
Expand Down

0 comments on commit 0a8dd6f

Please sign in to comment.